Agent Flow¶
Agent Flow 是 Claw-R1 中管理 Agent 执行生命周期的框架。它分为两大类:
- 白盒 Agent Flow:Agent 逻辑用 Python 编写,直接通过 Gateway 的
/generate、/submit_steps等端点交互,自行管理 tokenize 和 Step 构建。 - 黑盒 Agent Flow:Agent 使用标准 OpenAI API,通过
base_url透明接入,Gateway 自动处理 tokenize 和 Step 提交。
类层次¶
AgentFlowBase (abstract base)
│
├── SingleStepSingleTurnAgentFlow (白盒:单轮问答)
├── MultiStepAgentFlow (白盒:多轮工具调用)
│
└── BlackBoxAgentFlowBase (黑盒基类)
└── BlackBoxGSM8KAgentFlow (黑盒:GSM8K 数学题)
AgentFlowBase¶
所有 Agent Flow 的抽象基类,提供:
- Gateway URL 管理
- 配置访问(
self.config) - 抽象方法
run(sampling_params, **kwargs) -> int
白盒辅助方法¶
白盒 Agent Flow 可使用以下方法与 Gateway 交互:
gateway_generate(trajectory_uid, prompt_uid, messages, **kwargs)¶
向 Gateway /generate 发送异步 HTTP POST,返回生成文本和 token IDs。
text, response_ids, prompt_ids = await self.gateway_generate(
trajectory_uid="traj-abc",
prompt_uid="prompt-xyz",
messages=[{"role": "user", "content": "Summarize this document."}],
max_tokens=512,
temperature=0.8,
)
gateway_submit_steps(steps, channel="train")¶
向 Gateway /submit_steps 提交 Step 列表。
gateway_compute_reward(trajectory_uid, messages, dataset_fields)¶
向 Gateway /compute_reward 请求 reward 计算。
SingleStepSingleTurnAgentFlow¶
最简单的白盒实现:单个 prompt 产生单个 response。适用于每个样本都是独立问答对的数据集。
class MyAgentFlow(SingleStepSingleTurnAgentFlow):
async def run(self, sampling_params, **kwargs) -> int:
messages = [{"role": "user", "content": kwargs["raw_prompt"]}]
text, response_ids, prompt_ids = await self.gateway_generate(
trajectory_uid=kwargs["trajectory_uid"],
prompt_uid=kwargs["prompt_uid"],
messages=messages,
)
step = Step(
prompt_ids=prompt_ids,
response_ids=response_ids,
reward=0.0,
trajectory_uid=kwargs["trajectory_uid"],
prompt_uid=kwargs["prompt_uid"],
step_index=0,
is_last=True,
)
await self.gateway_submit_steps([step])
return 1
MultiStepAgentFlow¶
多轮 Agent Flow,支持工具调用、规划等场景。每轮产生一个 Step,通过 trajectory_uid 串联。
class ToolAgentFlow(MultiStepAgentFlow):
async def run(self, sampling_params, **kwargs) -> int:
messages = [{"role": "user", "content": kwargs["task"]}]
step_index = 0
while True:
text, response_ids, prompt_ids = await self.gateway_generate(...)
is_last = self.is_terminal(text)
step = Step(
prompt_ids=prompt_ids,
response_ids=response_ids,
step_index=step_index,
is_last=is_last,
...
)
await self.gateway_submit_steps([step])
if is_last:
break
messages.append({"role": "assistant", "content": text})
tool_result = await self.execute_tool(text)
messages.append({"role": "tool", "content": tool_result})
step_index += 1
return step_index + 1
BlackBoxAgentFlowBase¶
黑盒 Agent Flow 的基类。处理与 Gateway 的完整协议(init → register → complete),将 Agent 执行委托给子类的 _run_agent 方法。
详细文档见 Black-box Agent。
注册机制¶
Agent Flow 通过 @register("name") 装饰器注册到全局注册表:
from claw_r1.agent_flow.agent_flow import register
@register("my_agent_flow")
class MyAgentFlow(AgentFlowBase):
...
也可通过 YAML 配置文件注册(用于黑盒 Agent):
# agent_flow_config.yaml
- name: blackbox_gsm8k_agent
_target_: claw_r1.blackbox_agent.gsm8k_agent_flow.BlackBoxGSM8KAgentFlow
AgentFlowManager 和 AgentFlowWorker¶
- AgentFlowManager:管理多个
AgentFlowWorker,将 batch 中的每个样本分发给对应的 Agent Flow 执行。 - AgentFlowWorker:Ray Actor,持有 tokenizer 和配置,执行具体的 Agent Flow。
配置¶
在训练脚本中指定 Agent Flow: