异步任务管理
Viswoole 的异步任务系统基于 Swoole Task 机制封装,提供轻量级但功能完整的任务管理能力,支持任务注册、投递、队列持久化和结果等待。
核心组件
TaskManager - 任务管理器
负责任务的注册、投递和分发处理。
TaskProxy - 任务代理
封装 Swoole 原生 Task 对象,提供类型安全的任务数据访问和生命周期控制。
Task 门面
提供静态调用入口:
php
use Viswoole\Core\Facade\Task;前置配置
使用任务系统前,必须在 config/server.php 中启用 Task 相关选项:
php
// config/server.php
use Swoole\Constant;
return [
'servers' => [
'http' => [
'options' => [
// 必须开启以下任一选项
Constant::OPTION_TASK_WORKER_NUM => swoole_cpu_num(), // 配置 Task Worker 数量
Constant::OPTION_TASK_ENABLE_COROUTINE => true, // 启用任务协程
]
]
],
];注意:
OPTION_TASK_USE_OBJECT => true或OPTION_TASK_ENABLE_COROUTINE => true必须至少开启一个。
任务注册
注册单个任务(回调方式)
php
use Viswoole\Core\Facade\Task;
use Viswoole\Core\Server\TaskProxy;
// 在服务启动前注册(如 bootstrap 或 Provider 中)
Task::register('sendEmail', function (TaskProxy $task, $server) {
$data = $task->data;
// 执行发送邮件逻辑...
$result = Mailer::send($data['to'], $data['subject'], $data['body']);
// 标记任务完成并返回结果
$task->finish([
'success' => true,
'message_id' => $result->getId(),
]);
});注册类批量任务
传入类名时,框架会自动扫描该类的所有公开方法(魔术方法除外),以 “主题.方法名” 格式批量注册:
php
class SmsService
{
/**
* 发送登录验证码
*/
public static function sendLoginCode(TaskProxy $task): void
{
$phone = $task->data['phone'];
$code = $task->data['code'];
SmsSender::send($phone, "您的验证码是:{$code},5分钟内有效。");
$task->finish(['status' => 'ok']);
}
/**
* 发送注册验证码
*/
public static function sendRegisterCode(TaskProxy $task): void
{
// ...
$task->finish(['status' => 'ok']);
}
/**
* 发送营销短信
*/
public function sendMarketing(TaskProxy $task): void
{
// 实例方法同样支持
$task->finish(['status' => 'ok']);
}
}
// 注册后自动生成以下主题:
// sms.sendLoginCode
// sms.sendRegisterCode
// sms.sendMarketing
Task::register('sms', SmsService::class);注册规则:
- 静态方法:注册为
类名前缀.方法名,回调为ClassName::methodName - 实例方法:注册为
类名前缀.方法名,回调为[ClassName, 'methodName'] - 以
__开头的魔术方法会被过滤
任务投递
emit() - 异步投递(非阻塞)
最常用的任务投递方式,立即返回不等待结果:
php
use Viswoole\Core\Facade\Task;
// 基本投递
$taskId = Task::emit('sendEmail', [
'to' => 'user@example.com',
'subject' => '欢迎注册',
'body' => '感谢您注册我们的服务...',
]);
if ($taskId !== false) {
echo "任务已投递,ID: {$taskId}";
}
// 不持久化到队列(服务重启后丢失)
Task::emit('cleanup', ['type' => 'temp'], queue: false);参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
$topic | string | 已注册的任务主题名称 |
$data | mixed | 传递给处理器的业务数据 |
$queue | bool | 是否持久化到队列(默认 true) |
返回值:
- 成功: 返回任务 ID (
int) - 失败: 返回
false
emitWait() - 同步等待(阻塞)
投递任务并阻塞等待执行结果:
php
try {
$result = Task::emitWait('generateReport', [
'user_id' => 100,
'type' => 'monthly',
], timeout: 5.0);
if ($result !== false) {
echo "报告生成完成: " . $result;
} else {
echo "任务执行失败或超时";
}
} catch (\InvalidArgumentException $e) {
echo "任务主题不存在: " . $e->getMessage();
}emitsWait() - 并发多任务等待
同时投递多个任务并等待全部完成:
php
// 普通并发模式(taskWaitMulti)
$results = Task::emitsWait([
'getUserInfo' => ['user_id' => 100],
'getOrderList' => ['user_id' => 100],
], timeout: 3.0);
print_r($results);
// ['getUserInfo' => [...], 'getOrderList' => [...]]
// 协程并发模式(taskCo),更高效
$results = Task::emitsWait([
'task1' => [...],
'task2' => [...],
], timeout: 3.0, isCo: true);TaskProxy 使用详解
访问任务数据
php
Task::register('processData', function (TaskProxy $task) {
// 获取业务数据
$data = $task->data; // mixed - 投递时传入的数据
// 获取元信息
$topic = $task->topic; // string - 任务主题
$queueId = $task->queue_id; // string|null - 队列ID(非队列任务为null)
// 通过 __get 代理访问 SwooleTask 属性
$taskId = $task->id; // int - 任务ID
$workerId = $task->worker_id; // int - 所在Worker进程ID
$dispatchTime = $task->dispatch_time; // float - 投递时间戳
$flags = $task->flags; // int - 标志位
});标记任务完成
php
Task::register('heavyComputation', function (TaskProxy $task) {
$input = $task->data['number'];
// 执行耗时计算...
$result = $this->compute($input);
// 调用 finish 向 Worker 进程返回结果
// 同时清理队列缓存(如果是队列任务)
$task->finish([
'input' => $input,
'output' => $result,
'duration' => microtime(true) - $task->dispatch_time,
]);
// 注意:重复调用 finish 返回 false
});数据序列化
php
// 序列化(用于跨进程传输大数据)
$packed = TaskProxy::pack($largeData);
// 反序列化
$data = TaskProxy::unpack($packed);队列持久化
当 emit() 的 $queue 参数为 true(默认值)时,任务会被持久化到缓存队列中:
php
// 投递任务并持久化
$taskId = Task::emit('importantJob', $data);
// 此时任务信息已写入缓存,包含:
// - taskData(原始数据)
// - topic(主题)
// - queueId(唯一标识)
// - dispatchTime(投递时间)服务重启恢复:
框架会在 WorkerStart 事件中自动检测未完成的队列任务并重新投递:
php
// 内部实现(无需手动编写)
ServerEventHook::addEvent('workerStart', function ($server, $workerId) {
if (!$server->taskworker) {
$store = $this->getQueueCacheStore((string)$workerId);
$taskQueue = $store->get();
foreach ($taskQueue as $queueId) {
go(function () use ($queueId, $store) {
$taskData = $this->cache->get($queueId);
if ($taskData) {
Server::getServer()->task($taskData);
}
});
}
}
});完整示例
异步邮件发送服务
php
namespace App\Service;
use Viswoole\Core\Server\TaskProxy;
use Viswoole\Core\Facade\Task;
class EmailTaskService
{
/**
* 注册邮件相关任务
*/
public static function register(): void
{
Task::register('email.send', [self::class, 'handleSend']);
Task::register('email.batch', [self::class, 'handleBatch']);
}
/**
* 发送单封邮件
*/
public static function handleSend(TaskProxy $task): void
{
$to = $task->data['to'];
$subject = $task->data['subject'];
$body = $task->data['body'];
try {
$messageId = app(Mailer::class)->send($to, $subject, $body);
$task->finish([
'success' => true,
'message_id' => $messageId,
]);
} catch (\Throwable $e) {
$task->finish([
'success' => false,
'error' => $e->getMessage(),
]);
}
}
/**
* 批量发送邮件
*/
public static function handleBatch(TaskProxy $task): void
{
$recipients = $task->data['recipients']; // [['to'=>...,'subject'=>...,'body'=>...]]
$template = $task->data['template'];
$results = [];
foreach ($recipients as $recipient) {
try {
$id = app(Mailer::class)->send(
$recipient['to'],
$recipient['subject'] ?? $template['subject'],
$recipient['body'] ?? $template['body']
);
$results[] = ['to' => $recipient['to'], 'success' => true, 'id' => $id];
} catch (\Throwable $e) {
$results[] = ['to' => $recipient['to'], 'success' => false, 'error' => $e->getMessage()];
}
}
$task->finish(['results' => $results, 'total' => count($results)]);
}
}
// 在控制器中使用
class UserController extends Controller
{
public function register(Request $request)
{
// 验证数据...
// 创建用户
$user = User::create($request->validated());
// 异步发送欢迎邮件(不阻塞响应)
Task::emit('email.send', [
'to' => $user->email,
'subject' => '欢迎加入',
'body' => view('emails.welcome', ['user' => $user]),
]);
return json(['code' => 0, 'msg' => '注册成功']);
}
}最佳实践
1. 错误处理
php
Task::register('riskyOperation', function (TaskProxy $task) {
try {
$result = doSomethingRisky($task->data);
$task->finish(['ok' => true, 'data' => $result]);
} catch (\Throwable $e) {
// 确保始终调用 finish,避免 Worker 端永久阻塞
$task->finish([
'ok' => false,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
]);
}
});2. 超时控制
php
// 设置合理的超时时间
$result = Task::emitWait('slowTask', $data, timeout: 30.0);
// 对于可能长时间运行的任务,建议使用异步模式 + 回调通知
Task::emit('longRunningTask', $data);
// 任务完成后通过其他渠道(如 WebSocket、轮询 API)通知前端3. 数据大小限制
php
// Task 数据不宜过大(建议 < 2MB),大数据请传递标识符
Task::emit('processFile', [
'file_id' => $fileId, // 传ID而非文件内容
'user_id' => $userId,
]);
// 在任务处理器中根据 ID 从数据库/Redis/文件系统获取实际数据