Skip to content

DataPool

DataPool 是 Claw-R1 的数据管理核心 — 一个 Ray Actor,承担着 Agent 交互数据的存储、索引、质量追踪、分区管理和按需供给。它不仅是 Agent 侧与 Training 侧之间的缓冲区,更是整个数据基础设施的中枢。

在架构中的角色

Gateway ──► DataPool.submit_steps()     (数据采集:异步写入)
Trainer ◄── DataPool.fetch_batch()      (数据供给:阻塞拉取就绪组)
            DataPool.get_statistics()   (数据监控:实时统计)

DataPool 完全解耦了数据采集速度(由 Agent 请求频率驱动)和数据消费速度(由训练吞吐量驱动)。双方互不等待。

Channel 系统(数据分区)

DataPool 通过 channel 对数据进行分区管理。默认 channel 为 "train",验证流程使用 "val" channel 以隔离数据。

# 训练数据
data_pool.submit_step(step, channel="train")

# 验证数据
data_pool.submit_step(step, channel="val")

每个 channel 拥有独立的存储、索引和 FIFO 队列。

数据模型

DataPool 以 step 粒度 存储 trajectory。每个 step 是一个 (s, a, r) 元组:

@dataclass
class Step:
    prompt_ids:     list[int]   # state: 完整上下文 token IDs
    response_ids:   list[int]   # action: LLM 生成的 token IDs
    reward:         float       # 该 step 的即时 reward
    trajectory_uid: str         # 同一对话中的 step 共享此 ID
    prompt_uid:     str         # 同一 prompt 的 rollout 共享此 ID(用于 GRPO)
    step_index:     int         # trajectory 内的位置(0-indexed)
    policy_version: int         # 生成该 step 时的策略版本
    is_last:        bool        # 是否为 trajectory 的最后一个 step
    metadata:       dict        # 辅助数据(数据集字段、来源信息等)

内部索引

索引 类型 用途
trajectory_index dict[str, list[int]] trajectory_uid → step 索引列表
trajectory_complete dict[str, bool] 追踪 trajectory 是否已收到 is_last step
prompt_groups dict[str, PromptGroup] prompt_uid → trajectory 列表和完成状态

Producer API

submit_step(step: Step, channel="train")

添加单个 step 到指定 channel。由 Gateway 通过 Ray RPC 调用。

submit_steps(steps: list[Step], channel="train")

批量提交多个 step。比循环调用 submit_step 更高效。

complete_trajectory(trajectory_uid, reward=None, channel="train")

标记一条 trajectory 完成。用于黑盒模式,Agent 通过 Gateway 的 v1/complete_trajectory 端点触发。

Consumer API

fetch_batch(n_rollouts, channel="train") → list[Step] | None

FIFO 拉取下一个就绪的 prompt_uid 组。一个组在所有 trajectory 都收到 is_last step 后变为"就绪"。

当没有完整组可用时返回 None

# Trainer 侧
while True:
    batch = await data_pool.fetch_batch.remote(n_rollouts=5)
    if batch is not None:
        train_on_batch(batch)

容量管理与背压控制

当设置 max_queue_size 时,DataPool 在队列满时自动丢弃最旧的就绪组,防止数据堆积导致内存无限增长。这种背压机制也确保了训练侧消费的数据尽可能新鲜:

async_training:
  max_queue_size: null   # null = 无限

Training Backend(数据供给适配)

DataPool 通过可插拔的 TrainingBackendlist[Step] 转换为任意训练引擎的原生格式,实现数据管理与训练框架的解耦:

class VerlBackend(TrainingBackend):
    """将 Step 列表转换为 verl DataProto。"""

    def convert(self, steps: list[Step]) -> DataProto:
        # prompt_ids: 左填充到 prompt_length
        # response_ids: 右填充到 response_length
        # input_ids: [prompt_ids | response_ids]
        # attention_mask, position_ids, response_mask 等
        ...

Off-policy 支持(数据新鲜度管控)

每个 Step 都记录了生成时的 policy_version,DataPool 和 Trainer 可以据此判断数据的新鲜度。Trainer 通过 staleness threshold 配置来处理历史(off-policy)数据:

async_training:
  staleness_threshold: 0.1   # policy_version 滞后 > threshold 的 step 为 off-policy

Off-policy step 仍包含在 batch 中,但在 loss 计算时通过 importance sampling 进行降权。