自定义驱动扩展

Viswoole 采用驱动架构设计,缓存、日志、数据库等核心模块均支持自定义驱动扩展。开发者可根据业务需求实现专用驱动,无缝接入框架。

架构概览

text
┌─────────────────────────────────────────────────┐
│                  Facade(门面)                    │
├─────────────────────────────────────────────────┤
│              Manager(管理器)                     │
├──────────┬──────────┬──────────┬────────────────┤
│  缓存驱动  │  日志驱动  │  数据库驱动 │   自定义驱动    │
│          │          │          │               │
│ File     │ File     │ MySQL    │  Memcached    │
│ Redis    │ Elastic  │ PgSQL    │  MongoDB      │
│ Custom   │ Custom   │ Custom   │  Custom       │
└──────────┴──────────┴──────────┴────────────────┘

一、缓存驱动扩展

实现接口

缓存驱动需实现 CacheDriverInterface 接口:

php
namespace Viswoole\Cache\Contract;

interface CacheDriverInterface
{
    /** 获取缓存 */
    public function get(string $key, mixed $default = null): mixed;

    /** 设置缓存 */
    public function set(string $key, mixed $value, DateTime|int|null $expire = null, bool $NX = false): bool;

    /** 删除缓存 */
    public function delete(array|string $keys): false|int;

    /** 判断是否存在 */
    public function has(string $key): bool;

    /** 清除所有缓存 */
    public function clear(): bool;

    /** 自增 */
    public function inc(string $key, int $step = 1): false|int;

    /** 自减 */
    public function dec(string $key, int $step = 1): false|int;

    /** 获取并删除 */
    public function pull(string $key): mixed;

    /** 获取剩余有效期 */
    public function ttl(string $key): false|int;

    /** 获取连接句柄 */
    public function connect(): mixed;

    /** 关闭连接 */
    public function close(): void;

    /** 获取竞争锁 */
    public function lock(string $scene, int $expire = 10, bool $autoUnlock = false, int $retry = 5, int|float $sleep = 0.2): string;

    /** 解锁 */
    public function unlock(string $id): bool;

    /** 数组集合追加 */
    public function sAddArray(string $key, array|string $values): false|int;

    /** 获取数组集合 */
    public function getArray(string $key): array|false;

    /** 移除数组集合元素 */
    public function sRemoveArray(string $key, array|string $values): false|int;

    /** 获取标签键 */
    public function getTagKey(string $tag): string;

    /** 获取标签仓库名称 */
    public function getTagStoreName(): string;

    /** 获取实际缓存标识 */
    public function getCacheKey(string $key): string;

    /** 设置序列化方法 */
    public function setSerialize(Closure|string $set = 'serialize', Closure|string $get = 'unserialize'): static;
}

示例:Memcached 驱动

php
namespace App\Cache;

use DateTime;
use Memcached;
use Viswoole\Cache\Driver;
use Viswoole\Cache\Contract\CacheDriverInterface;

class MemcachedDriver extends Driver implements CacheDriverInterface
{
    private Memcached $memcached;

    public function __construct(
        string $host = '127.0.0.1',
        int $port = 11211,
        string $prefix = '',
        int $expire = 3600
    ) {
        parent::__construct($prefix, 'tag:', 'TAG_STORE', $expire);

        $this->memcached = new Memcached();
        $this->memcached->addServer($host, $port);
    }

    public function connect(): Memcached
    {
        return $this->memcached;
    }

    public function get(string $key, mixed $default = null): mixed
    {
        $value = $this->memcached->get($this->getCacheKey($key));
        return $this->memcached->getResultCode() === Memcached::RES_NOTFOUND
            ? $default
            : $this->unserialize($value);
    }

    public function set(string $key, mixed $value, DateTime|int|null $expire = null, bool $NX = false): bool
    {
        $expire = $this->formatExpireTime($expire ?? $this->expire);
        $key = $this->getCacheKey($key);
        $value = $this->serialize($value);

        if ($NX) {
            return $this->memcached->add($key, $value, $expire);
        }
        return $this->memcached->set($key, $value, $expire);
    }

    public function delete(array|string $keys): false|int
    {
        if (is_string($keys)) $keys = [$keys];
        $keys = array_map(fn($k) => $this->getCacheKey($k), $keys);
        return $this->memcached->deleteMulti($keys);
    }

    public function has(string $key): bool
    {
        $this->memcached->get($this->getCacheKey($key));
        return $this->memcached->getResultCode() !== Memcached::RES_NOTFOUND;
    }

    public function clear(): bool
    {
        return $this->memcached->flush();
    }

    public function inc(string $key, int $step = 1): false|int
    {
        $result = $this->memcached->increment($this->getCacheKey($key), $step);
        return $result === false ? false : $result;
    }

    public function dec(string $key, int $step = 1): false|int
    {
        $result = $this->memcached->decrement($this->getCacheKey($key), $step);
        return $result === false ? false : $result;
    }

    public function pull(string $key): mixed
    {
        $value = $this->get($key);
        if ($value !== null) $this->delete($key);
        return $value;
    }

    public function ttl(string $key): false|int
    {
        // Memcached 原生不支持 TTL 查询,可通过特殊标记模拟
        return -1; // 表示无法确定
    }

    public function close(): void
    {
        $this->memcached->quit();
    }

    // lock, sAddArray, getArray, sRemoveArray 等方法的实现...
}

注册自定义缓存驱动

php
// 方式一:配置文件注册
// config/cache.php
use Viswoole\Cache\Facade\Cache;

return [
    'default' => 'memcached',
    'stores' => [
        'file' => Cache::FILE_DRIVER,
        'memcached' => [
            'driver' => \App\Cache\MemcachedDriver::class,
            'options' => [
                'host' => env('MEMCACHED_HOST', '127.0.0.1'),
                'port' => env('MEMCACHED_PORT', 11211),
                'prefix' => 'app:',
                'expire' => 3600,
            ]
        ],
    ],
];

// 方式二:动态注册
Cache::addStore('memcached', new MemcachedDriver('127.0.0.1', 11211));

// 使用
Cache::store('memcached')->set('key', 'value', 3600);

二、日志驱动扩展

实现接口

日志驱动需实现 DriveInterfaceCollectorInterface

php
namespace Viswoole\Log\Contract;

interface DriveInterface
{
    /** 批量保存日志 */
    public function save(array $logRecords): void;

    /** 直接写入(绕过缓存) */
    public function write(string $level, string|\Stringable $message, array $context = []): void;

    /** 缓存日志(协程结束时批量保存) */
    public function record(string $level, string|\Stringable $message, array $context = []): void;
}

interface CollectorInterface
{
    public function emergency(Stringable|string $message, array $context = []): void;
    public function alert(Stringable|string $message, array $context = []): void;
    public function critical(Stringable|string $message, array $context = []): void;
    public function error(Stringable|string $message, array $context = []): void;
    public function warning(Stringable|string $message, array $context = []): void;
    public function notice(Stringable|string $message, array $context = []): void;
    public function info(Stringable|string $message, array $context = []): void;
    public function debug(Stringable|string $message, array $context = []): void;
    public function sql(Stringable|string $message, array $context = []): void;
    public function task(Stringable|string $message, array $context = []): void;
    public function mixed(string $level, Stringable|string $message, array $context = []): void;
}

示例:Elasticsearch 日志驱动

php
namespace App\Log;

use Viswoole\Log\Collector;
use Viswoole\Log\Contract\DriveInterface;

class ElasticsearchDrive extends Collector implements DriveInterface
{
    private \Elasticsearch\Client $client;

    public function __construct(array $hosts = ['http://localhost:9200'])
    {
        $this->client = \Elasticsearch\ClientBuilder::create()
            ->setHosts($hosts)
            ->build();
    }

    public function save(array $logRecords): void
    {
        if (empty($logRecords)) return;

        $params = ['body' => []];

        foreach ($logRecords as $record) {
            $params['body'][] = ['index' => ['_index' => 'app_logs']];
            $params['body'][] = [
                '@timestamp' => date('c', $record['timestamp']),
                'level' => $record['level'],
                'message' => $record['message'],
                'context' => $record['context'],
                'source' => $record['source'],
            ];
        }

        $this->client->bulk($params);
    }

    public function write(string $level, string|\Stringable $message, array $context = []): void
    {
        $this->save([\Viswoole\Log\LogManager::createLogData($level, $message, $context)]);
    }

    public function record(string $level, string|\Stringable $message, array $context = []): void
    {
        $this->mixed($level, $message, $context);
    }
}

注册自定义日志驱动

php
// config/log.php
'channels' => [
    'elasticsearch' => [
        'driver' => \App\Log\ElasticsearchDrive::class,
        'options' => [
            'hosts' => ['http://es-node1:9200', 'http://es-node2:9200'],
        ]
    ],

    // 错误日志同时写 ES 和文件
    // 'type_channel' => ['error' => 'elasticsearch'],
],

三、数据库驱动扩展

连接通道扩展

框架的数据库模块支持通过 Channel 机制扩展不同类型的数据库连接:

php
namespace App\Database\Channel;

use Viswoole\Database\Channel\PDO\PDOConfig;
use Viswoole\Core\Channel\Contract\ConnectionPoolInterface;

/**
 * ClickHouse 连接通道示例(简化版,仅演示核心方法)
 *
 * 注意:ConnectionPoolInterface 定义了 10 个方法:
 *   get、pop、put、isEmpty、close、isFull、fill、length、stats、getConfig
 * 完整实现请参考框架内置的 PDOChannel。
 */
class ClickHouseChannel implements ConnectionPoolInterface
{
    private PDOConfig $config;
    private array $pool = [];

    public function __construct(PDOConfig $config)
    {
        $this->config = $config;
    }

    public function get(float $timeout = -1): mixed
    {
        return $this->pop($timeout);
    }

    public function pop(float $timeout = -1): mixed
    {
        // 从连接池获取或创建 ClickHouse 连接
        // ...
    }

    public function put(mixed $connection): void
    {
        // 归还连接到池
    }

    public function isEmpty(): bool
    {
        return empty($this->pool);
    }

    public function close(): bool
    {
        // 关闭所有连接
        return true;
    }

    public function isFull(): bool
    {
        return false;
    }

    public function fill(int $size = null): void
    {
        // 填充连接池至指定数量
    }

    public function length(): int
    {
        return count($this->pool);
    }

    public function stats(): array
    {
        return [
            'consumer_num' => 0,
            'producer_num' => 0,
            'queue_num'    => $this->length(),
        ];
    }

    public function getConfig(): mixed
    {
        return $this->config;
    }
}

四、通用最佳实践

继承基类 vs 实现接口

方式适用场景优点缺点
继承基类功能与内置驱动相近复用公共逻辑,代码量少受基类约束
实现接口完全自定义行为灵活度高需实现全部方法

错误处理规范

php
public function get(string $key, mixed $default = null): mixed
{
    try {
        $result = $this->doGet($key);
        return $result ?? $default;
    } catch (\Throwable $e) {
        // 记录错误但不抛出异常,保持接口语义一致
        Log::warning('缓存读取失败', [
            'key' => $key,
            'error' => $e->getMessage(),
        ]);
        return $default;
    }
}

协程安全性

php
public function connect(): mixed
{
    // ✅ 正确:每个协程持有独立连接
    if (!isset($this->connections[Coroutine::getCid()])) {
        $this->connections[Coroutine::getCid()] = $this->createConnection();
    }
    return $this->connections[Coroutine::getCid()];
}

// ❌ 错误:共享连接导致协程安全问题
public function connect(): mixed
{
    if (!isset($this->connection)) {
        $this->connection = $this->createConnection(); // 所有协程共用!
    }
    return $this->connection;
}

配置规范化

php
// 建议在 config 目录下创建专门的驱动配置文件
// config/drivers.php

return [
    'cache' => [
        'memcached' => [
            'driver' => \App\Cache\MemcachedDriver::class,
            'hosts' => [
                ['host' => '127.0.0.1', 'port' => 11211],
            ],
            'options' => [
                \Memcached::OPT_COMPRESSION => true,
                \Memcached::OPT_BINARY_PROTOCOL => true,
            ],
        ],
    ],
    'log' => [
        'elasticsearch' => [
            'driver' => \App\Log\ElasticsearchDrive::class,
            'hosts' => env('ES_HOSTS', 'http://localhost:9200'),
            'index_prefix' => env('APP_NAME', 'app') . '_logs',
        ],
    ],
];