有那么一阵子,我在调试一套测试,生产上跑得溜溜的,测试环境下却炸成烟花。工作流没问题,活动都完成了,信号也收到了——可每个测试都挂了个 TimeoutError,工作流还没跟测试代码“打个招呼”就一命呜呼。
背后凶手是一只我从来没注意过的“钟”。
今天这篇文章,就来聊聊那个终于让我理解 Temporal 时间快进测试服务器的心智模型。如果你也曾被 WorkflowEnvironment.start_time_skipping() 搞晕,或者遇到过生产没问题,测试却老是超时的离谱现象,欢迎入座!
什么是 Temporal?(30秒极简介绍)
Temporal 是一个工作流编排引擎。你把业务流程写成一个“工作流”(比如一连串步骤),Temporal 保证帮你靠谱地跑完。哪怕有一步挂了,它也会自动重试,服务器崩了还能恢复进度。所有让人抓狂的东西——重试、超时、状态保存、分布式协调——它全包了。
这篇文章只需要知道一件事:你的工作流不是在普通的 Python 环境里跑的,而是“住”在 Temporal 的专属运行时里。 Temporal 负责掌控什么时候发生什么,这一点非常重要。后面一切的谜题都从这里展开。
Temporal 是时间管理员
改变我认知的关键点:Temporal 不会跳过你的代码,它只控制自己的“钟”。
我以前总觉得 Temporal 是个加速器,会让我的代码嗖嗖地快进。其实不是。Temporal 就像内核(kernel)一样,是个大管家,维护着一张“定时事件表”。
比如你的工作流里有:
await workflow.sleep(3600) # 睡1小时
Temporal 并不会让 Python 代码飞快执行,而是悄悄在自己的时间表上记一笔:
“在当前时间 + 3600秒时叫醒工作流ABC”
又比如你启动一个30秒超时的活动:
“如果活动XYZ在当前时间 + 30秒还没回,算失败”
整体超时10分钟:
“如果工作流ABC在当前时间 + 600秒还没完,直接掐灭”
无论是 sleep、超时、活动期限还是心跳检查,对 Temporal 来说本质都是“定时事件”:到点了,干某事。区别只是干什么——恢复、失败、杀掉——机制完全一样,就是“时间戳+动作”。
这,就是理解时间快进的基础。
时间快进:无聊时就快进
Temporal 的 Python SDK 有两种测试环境:
WorkflowEnvironment.start_time_skipping()—— 下载一个超轻量的 Rust 测试服务器(二进制直接跑,无需 Docker)。这个服务器有一块“虚拟钟表”,能随时快进。WorkflowEnvironment.start_local()—— 跑一个完整的 Temporal 服务,和生产一模一样,时钟也是实打实的。
时间快进服务器的聪明之处在于:只要它发现啥也没在发生,它就会把自己的时钟跳到下一个事件。
比如你的流程是:
await workflow.sleep(3600) # 睡1小时
await do_some_activity()
await workflow.sleep(7200) # 睡2小时
生产上,光这俩 sleep 就得熬3小时。时间快进模式下是这样:
- 调用
sleep(3600),时间表登记:“3600秒后恢复”。 - 发现没别的事干,咻——时钟直接跳到3600秒后。
- 定时器响了,继续执行,开始活动。
- 活动在跑——这时服务器不能快进,得等真实的结果。
- 活动完了,再来个
sleep(7200),时间表又记一笔。 - 没事干?再咻——时钟跳7200秒。
- 定时器响,流程结束。
整个流程花的真实时间,只取决于活动(比如 mock 活动可能只耗几毫秒)。那3小时的 sleep?一秒没浪费,直接“快进到片尾”。
这有个非常关键的结论:
如果你的代码里从不 sleep、不等、不设超时,时间快进模式根本不会提速。
想想看:时间快进只会跳过“无聊时间”——即 Temporal 的时钟处于空闲时。如果整个流程全是计算和逻辑,没有任何等待,时间快进就一点用没有。
当然,现实里的工作流总有各种等待、重试、sleep。关键在于:时间快进不是让代码更快,而是消灭事件之间的“等待时间”。
Handle:你的遥控器
每当你启动一个工作流,Temporal 会给你返回个“handle”。你可以把它想象成餐厅的取餐小票——下单后这张票就是你和厨房沟通的唯一渠道。
handle = await client.start_workflow(
MyWorkflow.run,
inputs,
id="order-123",
task_queue="kitchen",
)
这会让工作流在 Temporal 里独立运行。handle 就是你的遥控器,有几个常用“按钮”:
handle.result()—— “等我饭好通知我。” 你一直等着,直到结果出来。handle.query()—— “我的饭现在咋样啦?” 问一句,立马得到当前状态。handle.signal()—— “再加点芝士!” 给正在跑的工作流发个消息,立刻送达。handle.cancel()—— “不吃了,取消订单。”
handle 其实就是 temporalio.client.WorkflowHandle,没啥神秘的。关键是你点哪个“按钮”,Temporal 的时间快进服务器会有不同的反应。
result() vs query() — 你掉的那个坑
这里是我理解彻底反转的地方——也是让我卡壳最久的关键。
handle.result() 就像打电话:“我等着,饭好了一定通知我!” 你在那儿死等。更关键的是,你等的方式会让时间快进服务器理解为:“用户在等结果,我得帮忙快进到流程结束。”
handle.query() 则是打个电话:“现在饭好了吗?没好啊?那挂了哈!” 一次性问答。服务器没理由快进——你没让它等啥。
这里的坑在于:result() 是事件驱动(等“完成”事件),query() 是轮询(问一下返回当前状态)。 按常理,事件驱动总是“更好”,但在时间快进模式下,轮询反而更安全,因为它不会授权 Temporal 快进时钟。
| 操作 | Temporal 理解 | 是否触发快进 |
|---|---|---|
handle.result() | “等我做好叫你” | 是(服务会快进到结果) |
handle.query() | “现在状态咋样?” | 否(只回答,不快进) |
handle.signal() | “传个消息” | 否(立刻送达) |
中间那一列就是全部真相。result() 给了服务器快进的许可,query() 没给。
竞态条件:死于时间旅行
拼图已经齐了,下面说说这些原理怎么凑一起,制造了一个让人抓狂的 bug。
场景设定
我们的工作流有个环节需要用户反馈。每跑完一步(比如分析文档),工作流会暂停,等用户点“继续”。生产环境下,这可能要等几分钟、几小时,用户思考、编辑、喝咖啡。
代码里等待是这样的:
# backend/src/genai/temporal/workflow_executor.py, line 149
await workflow.wait_condition(lambda: waiter.signal_received)
意思就是:“等到 signal_received 变 True 再继续。” 没有超时,爱等多久等多久。生产上OK,用户点了继续,信号发到,流程继续跑。
考试监考的比喻
想象你是监考老师,考试2小时,墙上有钟。
生产环境(真钟): 学生举手:“老师,我要去拿计算器。” 你等着,计算器送到,学生继续考试,钟上显示45分钟过去,一切正常。
时间快进(魔法钟): 学生举手:“我要拿计算器。” 你环顾教室,没人写字,没动静,于是咻地一下把钟拨到1小时、1.5小时、2小时——“时间到,交卷!” 学生还没回来,考试挂了。
那边计算器刚进门,钟已经跳到2小时,晚了。
真正发生了什么
来看测试里的实际时间线:
真实时间 0.00s —— 测试启动工作流。Mock 活动瞬间完成(没真I/O)。
真实时间 ~0.02s —— 所有活动结束,工作流进入 wait_condition(),等反馈信号。这时 Temporal 的“时间表”里啥也没在等:没活动、没定时器,只有个等信号的工作流。
真实时间 ~0.02s —— 测试调用了 handle.result():“等流程结束告诉我。”
时间快进服务器一听:“用户等结果?查查时间表……啥都没挂着,只剩下600秒后的整体超时。那我直接快进600秒吧!”
服务器时钟跳跃: 0s → 600s。
工作流超时,挂了,报 TimeoutError。
真实时间 ~0.03s —— handle.result() 报错返回。
这时,测试里本来有个轮询循环,计划 asyncio.sleep(0.1) 后去 poll 状态、发信号。但这会儿,工作流尸体早凉了,信号只能送给“亡灵”。
整个过程真实时间不到30毫秒,服务器一键“快进到大结局”。
解决方法
明白原理后,解决办法反而很朴素。别在等信号时调用 handle.result()。改用 handle.query() 轮询进度,根据需要发信号,只有确认流程完了再调 result():
# backend/tests/integration/genai/test_temporal_workflow.py, lines 49-77
async def _run_workflow_with_feedback(handle):
while True:
await asyncio.sleep(0.1) # 测试进程的真实 sleep
# query() 不触发快进,时钟保持原地
progress = await handle.query(JurorAnalysisWorkflow.get_progress)
# 给所有等待反馈的步骤发信号
for step_id in progress.pending_feedback_steps:
await handle.signal(
JurorAnalysisWorkflow.submit_step_feedback,
StepFeedbackSignal(release_step_id=step_id),
)
# 确认流程真正结束后再调 result
if progress.status == WorkflowStatus.COMPLETED:
return await handle.result() # 这时已经完事,result 立刻返回
原理很简单:
query()不动时钟,工作流静止在wait_condition时刻,服务器不快进。- 轮询循环用真实时间跑,
asyncio.sleep(0.1)是测试进程自己的等待,不受 Temporal 支配。 - 信号无视时钟,立刻送达。
- 只有确定流程完了才调
result(),这时无需再快进,result 也会瞬间返回。
只要你不给服务器快进的机会,工作流的时钟就老老实实等着。
总结:几条硬核法则
心智模型:
- Temporal 是时间管理员,不是加速器。它维护一张“定时事件表”。
- 时间快进只“加速”空闲时,无事可跳就不跳。
- 没有定时器、超时、sleep,时间快进也无用武之地。
实操守则:
- 千万别在等信号时
await handle.result(),否则 Temporal 会直接快进到流程超时,信号都来不及送到。 - 用
handle.query()轮询工作流状态,查询是被动的,不会影响时钟。 - 信号任何时候都能送达,不受时钟影响。
- 只有确认流程结束,再调
handle.result()。
选哪个测试环境:
start_time_skipping()—— 适合自闭型工作流(只有定时器、活动,没有外部交互)。或者依赖信号的流程,但用轮询+信号模式也OK。start_local()—— 需要流程中途信号/查询的,建议用这个,所有等待都是真实时间,但会慢一些。
我们最终还是选了 start_time_skipping(),因为轮询+信号模式能完美兼容,还能免费享受所有 “sleep/重试/抖动” 的快进。
声明:本文由人类创作,部分内容参考 AI 建议,最终拍板归作者。
