Featured image of post Temporal 时间快进:你没见过的那只“钟”
技术 软件工程 后端开发

Temporal 时间快进:你没见过的那只“钟”

Temporal的时间快进测试服务器到底怎么工作?为什么它把我的测试整崩了?还有那个终于让我豁然开朗的心智模型。

有那么一阵子,我在调试一套测试,生产上跑得溜溜的,测试环境下却炸成烟花。工作流没问题,活动都完成了,信号也收到了——可每个测试都挂了个 TimeoutError,工作流还没跟测试代码“打个招呼”就一命呜呼。

背后凶手是一只我从来没注意过的“钟”。

今天这篇文章,就来聊聊那个终于让我理解 Temporal 时间快进测试服务器的心智模型。如果你也曾被 WorkflowEnvironment.start_time_skipping() 搞晕,或者遇到过生产没问题,测试却老是超时的离谱现象,欢迎入座!


什么是 Temporal?(30秒极简介绍)

Temporal 是一个工作流编排引擎。你把业务流程写成一个“工作流”(比如一连串步骤),Temporal 保证帮你靠谱地跑完。哪怕有一步挂了,它也会自动重试,服务器崩了还能恢复进度。所有让人抓狂的东西——重试、超时、状态保存、分布式协调——它全包了。

这篇文章只需要知道一件事:你的工作流不是在普通的 Python 环境里跑的,而是“住”在 Temporal 的专属运行时里。 Temporal 负责掌控什么时候发生什么,这一点非常重要。后面一切的谜题都从这里展开。


Temporal 是时间管理员

改变我认知的关键点:Temporal 不会跳过你的代码,它只控制自己的“钟”。

我以前总觉得 Temporal 是个加速器,会让我的代码嗖嗖地快进。其实不是。Temporal 就像内核(kernel)一样,是个大管家,维护着一张“定时事件表”。

比如你的工作流里有:

await workflow.sleep(3600)  # 睡1小时

Temporal 并不会让 Python 代码飞快执行,而是悄悄在自己的时间表上记一笔:

“在当前时间 + 3600秒时叫醒工作流ABC”

又比如你启动一个30秒超时的活动:

“如果活动XYZ在当前时间 + 30秒还没回,算失败”

整体超时10分钟:

“如果工作流ABC在当前时间 + 600秒还没完,直接掐灭”

无论是 sleep、超时、活动期限还是心跳检查,对 Temporal 来说本质都是“定时事件”:到点了,干某事。区别只是干什么——恢复、失败、杀掉——机制完全一样,就是“时间戳+动作”。

这,就是理解时间快进的基础。


时间快进:无聊时就快进

Temporal 的 Python SDK 有两种测试环境:

  • WorkflowEnvironment.start_time_skipping() —— 下载一个超轻量的 Rust 测试服务器(二进制直接跑,无需 Docker)。这个服务器有一块“虚拟钟表”,能随时快进。
  • WorkflowEnvironment.start_local() —— 跑一个完整的 Temporal 服务,和生产一模一样,时钟也是实打实的。

时间快进服务器的聪明之处在于:只要它发现啥也没在发生,它就会把自己的时钟跳到下一个事件。

比如你的流程是:

await workflow.sleep(3600)   # 睡1小时
await do_some_activity()
await workflow.sleep(7200)   # 睡2小时

生产上,光这俩 sleep 就得熬3小时。时间快进模式下是这样:

  1. 调用 sleep(3600),时间表登记:“3600秒后恢复”。
  2. 发现没别的事干,咻——时钟直接跳到3600秒后。
  3. 定时器响了,继续执行,开始活动。
  4. 活动在跑——这时服务器不能快进,得等真实的结果。
  5. 活动完了,再来个 sleep(7200),时间表又记一笔。
  6. 没事干?再咻——时钟跳7200秒。
  7. 定时器响,流程结束。

整个流程花的真实时间,只取决于活动(比如 mock 活动可能只耗几毫秒)。那3小时的 sleep?一秒没浪费,直接“快进到片尾”。

这有个非常关键的结论:

如果你的代码里从不 sleep、不等、不设超时,时间快进模式根本不会提速。

想想看:时间快进只会跳过“无聊时间”——即 Temporal 的时钟处于空闲时。如果整个流程全是计算和逻辑,没有任何等待,时间快进就一点用没有。

当然,现实里的工作流总有各种等待、重试、sleep。关键在于:时间快进不是让代码更快,而是消灭事件之间的“等待时间”。


Handle:你的遥控器

每当你启动一个工作流,Temporal 会给你返回个“handle”。你可以把它想象成餐厅的取餐小票——下单后这张票就是你和厨房沟通的唯一渠道。

handle = await client.start_workflow(
    MyWorkflow.run,
    inputs,
    id="order-123",
    task_queue="kitchen",
)

这会让工作流在 Temporal 里独立运行。handle 就是你的遥控器,有几个常用“按钮”:

  • handle.result() —— “等我饭好通知我。” 你一直等着,直到结果出来。
  • handle.query() —— “我的饭现在咋样啦?” 问一句,立马得到当前状态。
  • handle.signal() —— “再加点芝士!” 给正在跑的工作流发个消息,立刻送达。
  • handle.cancel() —— “不吃了,取消订单。”

handle 其实就是 temporalio.client.WorkflowHandle,没啥神秘的。关键是你点哪个“按钮”,Temporal 的时间快进服务器会有不同的反应。


result() vs query() — 你掉的那个坑

这里是我理解彻底反转的地方——也是让我卡壳最久的关键。

handle.result() 就像打电话:“我等着,饭好了一定通知我!” 你在那儿死等。更关键的是,你等的方式会让时间快进服务器理解为:“用户在等结果,我得帮忙快进到流程结束。”

handle.query() 则是打个电话:“现在饭好了吗?没好啊?那挂了哈!” 一次性问答。服务器没理由快进——你没让它等啥。

这里的坑在于:result() 是事件驱动(等“完成”事件),query() 是轮询(问一下返回当前状态)。 按常理,事件驱动总是“更好”,但在时间快进模式下,轮询反而更安全,因为它不会授权 Temporal 快进时钟。

操作Temporal 理解是否触发快进
handle.result()“等我做好叫你”(服务会快进到结果)
handle.query()“现在状态咋样?”(只回答,不快进)
handle.signal()“传个消息”(立刻送达)

中间那一列就是全部真相。result() 给了服务器快进的许可,query() 没给。


竞态条件:死于时间旅行

拼图已经齐了,下面说说这些原理怎么凑一起,制造了一个让人抓狂的 bug。

场景设定

我们的工作流有个环节需要用户反馈。每跑完一步(比如分析文档),工作流会暂停,等用户点“继续”。生产环境下,这可能要等几分钟、几小时,用户思考、编辑、喝咖啡。

代码里等待是这样的:

# backend/src/genai/temporal/workflow_executor.py, line 149
await workflow.wait_condition(lambda: waiter.signal_received)

意思就是:“等到 signal_received 变 True 再继续。” 没有超时,爱等多久等多久。生产上OK,用户点了继续,信号发到,流程继续跑。

考试监考的比喻

想象你是监考老师,考试2小时,墙上有钟。

生产环境(真钟): 学生举手:“老师,我要去拿计算器。” 你等着,计算器送到,学生继续考试,钟上显示45分钟过去,一切正常。

时间快进(魔法钟): 学生举手:“我要拿计算器。” 你环顾教室,没人写字,没动静,于是咻地一下把钟拨到1小时、1.5小时、2小时——“时间到,交卷!” 学生还没回来,考试挂了。

那边计算器刚进门,钟已经跳到2小时,晚了。

真正发生了什么

来看测试里的实际时间线:

真实时间 0.00s —— 测试启动工作流。Mock 活动瞬间完成(没真I/O)。

真实时间 ~0.02s —— 所有活动结束,工作流进入 wait_condition(),等反馈信号。这时 Temporal 的“时间表”里啥也没在等:没活动、没定时器,只有个等信号的工作流。

真实时间 ~0.02s —— 测试调用了 handle.result():“等流程结束告诉我。”

时间快进服务器一听:“用户等结果?查查时间表……啥都没挂着,只剩下600秒后的整体超时。那我直接快进600秒吧!”

服务器时钟跳跃: 0s → 600s。

工作流超时,挂了,报 TimeoutError

真实时间 ~0.03s —— handle.result() 报错返回。

这时,测试里本来有个轮询循环,计划 asyncio.sleep(0.1) 后去 poll 状态、发信号。但这会儿,工作流尸体早凉了,信号只能送给“亡灵”。

整个过程真实时间不到30毫秒,服务器一键“快进到大结局”。

解决方法

明白原理后,解决办法反而很朴素。别在等信号时调用 handle.result()。改用 handle.query() 轮询进度,根据需要发信号,只有确认流程完了再调 result()

# backend/tests/integration/genai/test_temporal_workflow.py, lines 49-77
async def _run_workflow_with_feedback(handle):
    while True:
        await asyncio.sleep(0.1)    # 测试进程的真实 sleep

        # query() 不触发快进,时钟保持原地
        progress = await handle.query(JurorAnalysisWorkflow.get_progress)

        # 给所有等待反馈的步骤发信号
        for step_id in progress.pending_feedback_steps:
            await handle.signal(
                JurorAnalysisWorkflow.submit_step_feedback,
                StepFeedbackSignal(release_step_id=step_id),
            )

        # 确认流程真正结束后再调 result
        if progress.status == WorkflowStatus.COMPLETED:
            return await handle.result()  # 这时已经完事,result 立刻返回

原理很简单:

  1. query() 不动时钟,工作流静止在 wait_condition 时刻,服务器不快进。
  2. 轮询循环用真实时间跑asyncio.sleep(0.1) 是测试进程自己的等待,不受 Temporal 支配。
  3. 信号无视时钟,立刻送达。
  4. 只有确定流程完了才调 result(),这时无需再快进,result 也会瞬间返回。

只要你不给服务器快进的机会,工作流的时钟就老老实实等着。


总结:几条硬核法则

心智模型:

  • Temporal 是时间管理员,不是加速器。它维护一张“定时事件表”。
  • 时间快进只“加速”空闲时,无事可跳就不跳。
  • 没有定时器、超时、sleep,时间快进也无用武之地。

实操守则:

  • 千万别在等信号时 await handle.result(),否则 Temporal 会直接快进到流程超时,信号都来不及送到。
  • handle.query() 轮询工作流状态,查询是被动的,不会影响时钟。
  • 信号任何时候都能送达,不受时钟影响。
  • 只有确认流程结束,再调 handle.result()

选哪个测试环境:

  • start_time_skipping() —— 适合自闭型工作流(只有定时器、活动,没有外部交互)。或者依赖信号的流程,但用轮询+信号模式也OK。
  • start_local() —— 需要流程中途信号/查询的,建议用这个,所有等待都是真实时间,但会慢一些。

我们最终还是选了 start_time_skipping(),因为轮询+信号模式能完美兼容,还能免费享受所有 “sleep/重试/抖动” 的快进。


声明:本文由人类创作,部分内容参考 AI 建议,最终拍板归作者。

© 2022 - 2026 张欣耕

保留所有权利