异步任务管理

Viswoole 的异步任务系统基于 Swoole Task 机制封装,提供轻量级但功能完整的任务管理能力,支持任务注册、投递、队列持久化和结果等待。

核心组件

TaskManager - 任务管理器

负责任务的注册、投递和分发处理。

TaskProxy - 任务代理

封装 Swoole 原生 Task 对象,提供类型安全的任务数据访问和生命周期控制。

Task 门面

提供静态调用入口:

php
use Viswoole\Core\Facade\Task;

前置配置

使用任务系统前,必须在 config/server.php 中启用 Task 相关选项:

php
// config/server.php
use Swoole\Constant;

return [
    'servers' => [
        'http' => [
            'options' => [
                // 必须开启以下任一选项
                Constant::OPTION_TASK_WORKER_NUM => swoole_cpu_num(),  // 配置 Task Worker 数量
                Constant::OPTION_TASK_ENABLE_COROUTINE => true,         // 启用任务协程
            ]
        ]
    ],
];

注意: OPTION_TASK_USE_OBJECT => trueOPTION_TASK_ENABLE_COROUTINE => true 必须至少开启一个。

任务注册

注册单个任务(回调方式)

php
use Viswoole\Core\Facade\Task;
use Viswoole\Core\Server\TaskProxy;

// 在服务启动前注册(如 bootstrap 或 Provider 中)
Task::register('sendEmail', function (TaskProxy $task, $server) {
    $data = $task->data;

    // 执行发送邮件逻辑...
    $result = Mailer::send($data['to'], $data['subject'], $data['body']);

    // 标记任务完成并返回结果
    $task->finish([
        'success' => true,
        'message_id' => $result->getId(),
    ]);
});

注册类批量任务

传入类名时,框架会自动扫描该类的所有公开方法(魔术方法除外),以 “主题.方法名” 格式批量注册:

php
class SmsService
{
    /**
     * 发送登录验证码
     */
    public static function sendLoginCode(TaskProxy $task): void
    {
        $phone = $task->data['phone'];
        $code = $task->data['code'];

        SmsSender::send($phone, "您的验证码是:{$code},5分钟内有效。");

        $task->finish(['status' => 'ok']);
    }

    /**
     * 发送注册验证码
     */
    public static function sendRegisterCode(TaskProxy $task): void
    {
        // ...
        $task->finish(['status' => 'ok']);
    }

    /**
     * 发送营销短信
     */
    public function sendMarketing(TaskProxy $task): void
    {
        // 实例方法同样支持
        $task->finish(['status' => 'ok']);
    }
}

// 注册后自动生成以下主题:
// sms.sendLoginCode
// sms.sendRegisterCode
// sms.sendMarketing
Task::register('sms', SmsService::class);

注册规则:

  • 静态方法:注册为 类名前缀.方法名,回调为 ClassName::methodName
  • 实例方法:注册为 类名前缀.方法名,回调为 [ClassName, 'methodName']
  • __ 开头的魔术方法会被过滤

任务投递

emit() - 异步投递(非阻塞)

最常用的任务投递方式,立即返回不等待结果:

php
use Viswoole\Core\Facade\Task;

// 基本投递
$taskId = Task::emit('sendEmail', [
    'to' => 'user@example.com',
    'subject' => '欢迎注册',
    'body' => '感谢您注册我们的服务...',
]);

if ($taskId !== false) {
    echo "任务已投递,ID: {$taskId}";
}

// 不持久化到队列(服务重启后丢失)
Task::emit('cleanup', ['type' => 'temp'], queue: false);

参数说明:

参数类型说明
$topicstring已注册的任务主题名称
$datamixed传递给处理器的业务数据
$queuebool是否持久化到队列(默认 true

返回值:

  • 成功: 返回任务 ID (int)
  • 失败: 返回 false

emitWait() - 同步等待(阻塞)

投递任务并阻塞等待执行结果:

php
try {
    $result = Task::emitWait('generateReport', [
        'user_id' => 100,
        'type' => 'monthly',
    ], timeout: 5.0);

    if ($result !== false) {
        echo "报告生成完成: " . $result;
    } else {
        echo "任务执行失败或超时";
    }
} catch (\InvalidArgumentException $e) {
    echo "任务主题不存在: " . $e->getMessage();
}

emitsWait() - 并发多任务等待

同时投递多个任务并等待全部完成:

php
// 普通并发模式(taskWaitMulti)
$results = Task::emitsWait([
    'getUserInfo' => ['user_id' => 100],
    'getOrderList' => ['user_id' => 100],
], timeout: 3.0);

print_r($results);
// ['getUserInfo' => [...], 'getOrderList' => [...]]

// 协程并发模式(taskCo),更高效
$results = Task::emitsWait([
    'task1' => [...],
    'task2' => [...],
], timeout: 3.0, isCo: true);

TaskProxy 使用详解

访问任务数据

php
Task::register('processData', function (TaskProxy $task) {
    // 获取业务数据
    $data = $task->data;           // mixed - 投递时传入的数据

    // 获取元信息
    $topic = $task->topic;         // string - 任务主题
    $queueId = $task->queue_id;    // string|null - 队列ID(非队列任务为null)

    // 通过 __get 代理访问 SwooleTask 属性
    $taskId = $task->id;           // int - 任务ID
    $workerId = $task->worker_id;  // int - 所在Worker进程ID
    $dispatchTime = $task->dispatch_time; // float - 投递时间戳
    $flags = $task->flags;         // int - 标志位
});

标记任务完成

php
Task::register('heavyComputation', function (TaskProxy $task) {
    $input = $task->data['number'];

    // 执行耗时计算...
    $result = $this->compute($input);

    // 调用 finish 向 Worker 进程返回结果
    // 同时清理队列缓存(如果是队列任务)
    $task->finish([
        'input' => $input,
        'output' => $result,
        'duration' => microtime(true) - $task->dispatch_time,
    ]);

    // 注意:重复调用 finish 返回 false
});

数据序列化

php
// 序列化(用于跨进程传输大数据)
$packed = TaskProxy::pack($largeData);

// 反序列化
$data = TaskProxy::unpack($packed);

队列持久化

emit()$queue 参数为 true(默认值)时,任务会被持久化到缓存队列中:

php
// 投递任务并持久化
$taskId = Task::emit('importantJob', $data);

// 此时任务信息已写入缓存,包含:
// - taskData(原始数据)
// - topic(主题)
// - queueId(唯一标识)
// - dispatchTime(投递时间)

服务重启恢复:

框架会在 WorkerStart 事件中自动检测未完成的队列任务并重新投递:

php
// 内部实现(无需手动编写)
ServerEventHook::addEvent('workerStart', function ($server, $workerId) {
    if (!$server->taskworker) {
        $store = $this->getQueueCacheStore((string)$workerId);
        $taskQueue = $store->get();

        foreach ($taskQueue as $queueId) {
            go(function () use ($queueId, $store) {
                $taskData = $this->cache->get($queueId);
                if ($taskData) {
                    Server::getServer()->task($taskData);
                }
            });
        }
    }
});

完整示例

异步邮件发送服务

php
namespace App\Service;

use Viswoole\Core\Server\TaskProxy;
use Viswoole\Core\Facade\Task;

class EmailTaskService
{
    /**
     * 注册邮件相关任务
     */
    public static function register(): void
    {
        Task::register('email.send', [self::class, 'handleSend']);
        Task::register('email.batch', [self::class, 'handleBatch']);
    }

    /**
     * 发送单封邮件
     */
    public static function handleSend(TaskProxy $task): void
    {
        $to = $task->data['to'];
        $subject = $task->data['subject'];
        $body = $task->data['body'];

        try {
            $messageId = app(Mailer::class)->send($to, $subject, $body);
            $task->finish([
                'success' => true,
                'message_id' => $messageId,
            ]);
        } catch (\Throwable $e) {
            $task->finish([
                'success' => false,
                'error' => $e->getMessage(),
            ]);
        }
    }

    /**
     * 批量发送邮件
     */
    public static function handleBatch(TaskProxy $task): void
    {
        $recipients = $task->data['recipients']; // [['to'=>...,'subject'=>...,'body'=>...]]
        $template = $task->data['template'];

        $results = [];
        foreach ($recipients as $recipient) {
            try {
                $id = app(Mailer::class)->send(
                    $recipient['to'],
                    $recipient['subject'] ?? $template['subject'],
                    $recipient['body'] ?? $template['body']
                );
                $results[] = ['to' => $recipient['to'], 'success' => true, 'id' => $id];
            } catch (\Throwable $e) {
                $results[] = ['to' => $recipient['to'], 'success' => false, 'error' => $e->getMessage()];
            }
        }

        $task->finish(['results' => $results, 'total' => count($results)]);
    }
}

// 在控制器中使用
class UserController extends Controller
{
    public function register(Request $request)
    {
        // 验证数据...

        // 创建用户
        $user = User::create($request->validated());

        // 异步发送欢迎邮件(不阻塞响应)
        Task::emit('email.send', [
            'to' => $user->email,
            'subject' => '欢迎加入',
            'body' => view('emails.welcome', ['user' => $user]),
        ]);

        return json(['code' => 0, 'msg' => '注册成功']);
    }
}

最佳实践

1. 错误处理

php
Task::register('riskyOperation', function (TaskProxy $task) {
    try {
        $result = doSomethingRisky($task->data);
        $task->finish(['ok' => true, 'data' => $result]);
    } catch (\Throwable $e) {
        // 确保始终调用 finish,避免 Worker 端永久阻塞
        $task->finish([
            'ok' => false,
            'error' => $e->getMessage(),
            'trace' => $e->getTraceAsString(),
        ]);
    }
});

2. 超时控制

php
// 设置合理的超时时间
$result = Task::emitWait('slowTask', $data, timeout: 30.0);

// 对于可能长时间运行的任务,建议使用异步模式 + 回调通知
Task::emit('longRunningTask', $data);
// 任务完成后通过其他渠道(如 WebSocket、轮询 API)通知前端

3. 数据大小限制

php
// Task 数据不宜过大(建议 < 2MB),大数据请传递标识符
Task::emit('processFile', [
    'file_id' => $fileId,      // 传ID而非文件内容
    'user_id' => $userId,
]);

// 在任务处理器中根据 ID 从数据库/Redis/文件系统获取实际数据