自定义驱动扩展
Viswoole 采用驱动架构设计,缓存、日志、数据库等核心模块均支持自定义驱动扩展。开发者可根据业务需求实现专用驱动,无缝接入框架。
架构概览
text
┌─────────────────────────────────────────────────┐
│ Facade(门面) │
├─────────────────────────────────────────────────┤
│ Manager(管理器) │
├──────────┬──────────┬──────────┬────────────────┤
│ 缓存驱动 │ 日志驱动 │ 数据库驱动 │ 自定义驱动 │
│ │ │ │ │
│ File │ File │ MySQL │ Memcached │
│ Redis │ Elastic │ PgSQL │ MongoDB │
│ Custom │ Custom │ Custom │ Custom │
└──────────┴──────────┴──────────┴────────────────┘一、缓存驱动扩展
实现接口
缓存驱动需实现 CacheDriverInterface 接口:
php
namespace Viswoole\Cache\Contract;
interface CacheDriverInterface
{
/** 获取缓存 */
public function get(string $key, mixed $default = null): mixed;
/** 设置缓存 */
public function set(string $key, mixed $value, DateTime|int|null $expire = null, bool $NX = false): bool;
/** 删除缓存 */
public function delete(array|string $keys): false|int;
/** 判断是否存在 */
public function has(string $key): bool;
/** 清除所有缓存 */
public function clear(): bool;
/** 自增 */
public function inc(string $key, int $step = 1): false|int;
/** 自减 */
public function dec(string $key, int $step = 1): false|int;
/** 获取并删除 */
public function pull(string $key): mixed;
/** 获取剩余有效期 */
public function ttl(string $key): false|int;
/** 获取连接句柄 */
public function connect(): mixed;
/** 关闭连接 */
public function close(): void;
/** 获取竞争锁 */
public function lock(string $scene, int $expire = 10, bool $autoUnlock = false, int $retry = 5, int|float $sleep = 0.2): string;
/** 解锁 */
public function unlock(string $id): bool;
/** 数组集合追加 */
public function sAddArray(string $key, array|string $values): false|int;
/** 获取数组集合 */
public function getArray(string $key): array|false;
/** 移除数组集合元素 */
public function sRemoveArray(string $key, array|string $values): false|int;
/** 获取标签键 */
public function getTagKey(string $tag): string;
/** 获取标签仓库名称 */
public function getTagStoreName(): string;
/** 获取实际缓存标识 */
public function getCacheKey(string $key): string;
/** 设置序列化方法 */
public function setSerialize(Closure|string $set = 'serialize', Closure|string $get = 'unserialize'): static;
}示例:Memcached 驱动
php
namespace App\Cache;
use DateTime;
use Memcached;
use Viswoole\Cache\Driver;
use Viswoole\Cache\Contract\CacheDriverInterface;
class MemcachedDriver extends Driver implements CacheDriverInterface
{
private Memcached $memcached;
public function __construct(
string $host = '127.0.0.1',
int $port = 11211,
string $prefix = '',
int $expire = 3600
) {
parent::__construct($prefix, 'tag:', 'TAG_STORE', $expire);
$this->memcached = new Memcached();
$this->memcached->addServer($host, $port);
}
public function connect(): Memcached
{
return $this->memcached;
}
public function get(string $key, mixed $default = null): mixed
{
$value = $this->memcached->get($this->getCacheKey($key));
return $this->memcached->getResultCode() === Memcached::RES_NOTFOUND
? $default
: $this->unserialize($value);
}
public function set(string $key, mixed $value, DateTime|int|null $expire = null, bool $NX = false): bool
{
$expire = $this->formatExpireTime($expire ?? $this->expire);
$key = $this->getCacheKey($key);
$value = $this->serialize($value);
if ($NX) {
return $this->memcached->add($key, $value, $expire);
}
return $this->memcached->set($key, $value, $expire);
}
public function delete(array|string $keys): false|int
{
if (is_string($keys)) $keys = [$keys];
$keys = array_map(fn($k) => $this->getCacheKey($k), $keys);
return $this->memcached->deleteMulti($keys);
}
public function has(string $key): bool
{
$this->memcached->get($this->getCacheKey($key));
return $this->memcached->getResultCode() !== Memcached::RES_NOTFOUND;
}
public function clear(): bool
{
return $this->memcached->flush();
}
public function inc(string $key, int $step = 1): false|int
{
$result = $this->memcached->increment($this->getCacheKey($key), $step);
return $result === false ? false : $result;
}
public function dec(string $key, int $step = 1): false|int
{
$result = $this->memcached->decrement($this->getCacheKey($key), $step);
return $result === false ? false : $result;
}
public function pull(string $key): mixed
{
$value = $this->get($key);
if ($value !== null) $this->delete($key);
return $value;
}
public function ttl(string $key): false|int
{
// Memcached 原生不支持 TTL 查询,可通过特殊标记模拟
return -1; // 表示无法确定
}
public function close(): void
{
$this->memcached->quit();
}
// lock, sAddArray, getArray, sRemoveArray 等方法的实现...
}注册自定义缓存驱动
php
// 方式一:配置文件注册
// config/cache.php
use Viswoole\Cache\Facade\Cache;
return [
'default' => 'memcached',
'stores' => [
'file' => Cache::FILE_DRIVER,
'memcached' => [
'driver' => \App\Cache\MemcachedDriver::class,
'options' => [
'host' => env('MEMCACHED_HOST', '127.0.0.1'),
'port' => env('MEMCACHED_PORT', 11211),
'prefix' => 'app:',
'expire' => 3600,
]
],
],
];
// 方式二:动态注册
Cache::addStore('memcached', new MemcachedDriver('127.0.0.1', 11211));
// 使用
Cache::store('memcached')->set('key', 'value', 3600);二、日志驱动扩展
实现接口
日志驱动需实现 DriveInterface 和 CollectorInterface:
php
namespace Viswoole\Log\Contract;
interface DriveInterface
{
/** 批量保存日志 */
public function save(array $logRecords): void;
/** 直接写入(绕过缓存) */
public function write(string $level, string|\Stringable $message, array $context = []): void;
/** 缓存日志(协程结束时批量保存) */
public function record(string $level, string|\Stringable $message, array $context = []): void;
}
interface CollectorInterface
{
public function emergency(Stringable|string $message, array $context = []): void;
public function alert(Stringable|string $message, array $context = []): void;
public function critical(Stringable|string $message, array $context = []): void;
public function error(Stringable|string $message, array $context = []): void;
public function warning(Stringable|string $message, array $context = []): void;
public function notice(Stringable|string $message, array $context = []): void;
public function info(Stringable|string $message, array $context = []): void;
public function debug(Stringable|string $message, array $context = []): void;
public function sql(Stringable|string $message, array $context = []): void;
public function task(Stringable|string $message, array $context = []): void;
public function mixed(string $level, Stringable|string $message, array $context = []): void;
}示例:Elasticsearch 日志驱动
php
namespace App\Log;
use Viswoole\Log\Collector;
use Viswoole\Log\Contract\DriveInterface;
class ElasticsearchDrive extends Collector implements DriveInterface
{
private \Elasticsearch\Client $client;
public function __construct(array $hosts = ['http://localhost:9200'])
{
$this->client = \Elasticsearch\ClientBuilder::create()
->setHosts($hosts)
->build();
}
public function save(array $logRecords): void
{
if (empty($logRecords)) return;
$params = ['body' => []];
foreach ($logRecords as $record) {
$params['body'][] = ['index' => ['_index' => 'app_logs']];
$params['body'][] = [
'@timestamp' => date('c', $record['timestamp']),
'level' => $record['level'],
'message' => $record['message'],
'context' => $record['context'],
'source' => $record['source'],
];
}
$this->client->bulk($params);
}
public function write(string $level, string|\Stringable $message, array $context = []): void
{
$this->save([\Viswoole\Log\LogManager::createLogData($level, $message, $context)]);
}
public function record(string $level, string|\Stringable $message, array $context = []): void
{
$this->mixed($level, $message, $context);
}
}注册自定义日志驱动
php
// config/log.php
'channels' => [
'elasticsearch' => [
'driver' => \App\Log\ElasticsearchDrive::class,
'options' => [
'hosts' => ['http://es-node1:9200', 'http://es-node2:9200'],
]
],
// 错误日志同时写 ES 和文件
// 'type_channel' => ['error' => 'elasticsearch'],
],三、数据库驱动扩展
连接通道扩展
框架的数据库模块支持通过 Channel 机制扩展不同类型的数据库连接:
php
namespace App\Database\Channel;
use Viswoole\Database\Channel\PDO\PDOConfig;
use Viswoole\Core\Channel\Contract\ConnectionPoolInterface;
/**
* ClickHouse 连接通道示例(简化版,仅演示核心方法)
*
* 注意:ConnectionPoolInterface 定义了 10 个方法:
* get、pop、put、isEmpty、close、isFull、fill、length、stats、getConfig
* 完整实现请参考框架内置的 PDOChannel。
*/
class ClickHouseChannel implements ConnectionPoolInterface
{
private PDOConfig $config;
private array $pool = [];
public function __construct(PDOConfig $config)
{
$this->config = $config;
}
public function get(float $timeout = -1): mixed
{
return $this->pop($timeout);
}
public function pop(float $timeout = -1): mixed
{
// 从连接池获取或创建 ClickHouse 连接
// ...
}
public function put(mixed $connection): void
{
// 归还连接到池
}
public function isEmpty(): bool
{
return empty($this->pool);
}
public function close(): bool
{
// 关闭所有连接
return true;
}
public function isFull(): bool
{
return false;
}
public function fill(int $size = null): void
{
// 填充连接池至指定数量
}
public function length(): int
{
return count($this->pool);
}
public function stats(): array
{
return [
'consumer_num' => 0,
'producer_num' => 0,
'queue_num' => $this->length(),
];
}
public function getConfig(): mixed
{
return $this->config;
}
}四、通用最佳实践
继承基类 vs 实现接口
| 方式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 继承基类 | 功能与内置驱动相近 | 复用公共逻辑,代码量少 | 受基类约束 |
| 实现接口 | 完全自定义行为 | 灵活度高 | 需实现全部方法 |
错误处理规范
php
public function get(string $key, mixed $default = null): mixed
{
try {
$result = $this->doGet($key);
return $result ?? $default;
} catch (\Throwable $e) {
// 记录错误但不抛出异常,保持接口语义一致
Log::warning('缓存读取失败', [
'key' => $key,
'error' => $e->getMessage(),
]);
return $default;
}
}协程安全性
php
public function connect(): mixed
{
// ✅ 正确:每个协程持有独立连接
if (!isset($this->connections[Coroutine::getCid()])) {
$this->connections[Coroutine::getCid()] = $this->createConnection();
}
return $this->connections[Coroutine::getCid()];
}
// ❌ 错误:共享连接导致协程安全问题
public function connect(): mixed
{
if (!isset($this->connection)) {
$this->connection = $this->createConnection(); // 所有协程共用!
}
return $this->connection;
}配置规范化
php
// 建议在 config 目录下创建专门的驱动配置文件
// config/drivers.php
return [
'cache' => [
'memcached' => [
'driver' => \App\Cache\MemcachedDriver::class,
'hosts' => [
['host' => '127.0.0.1', 'port' => 11211],
],
'options' => [
\Memcached::OPT_COMPRESSION => true,
\Memcached::OPT_BINARY_PROTOCOL => true,
],
],
],
'log' => [
'elasticsearch' => [
'driver' => \App\Log\ElasticsearchDrive::class,
'hosts' => env('ES_HOSTS', 'http://localhost:9200'),
'index_prefix' => env('APP_NAME', 'app') . '_logs',
],
],
];