Skip to content

轮询策略

异步生图

推荐节奏

  • 第一次轮询:提交后约 50s
  • 单任务后续轮询间隔:默认固定 10s
  • 多任务后续轮询间隔:默认固定 10s,并优先使用 POST /v1/tasks/batch-get
  • 如果服务端返回更大的 Retry-After / next_poll_after_ms,优先遵守更大的值
  • 客户端总超时:建议 1200s

客户端交互建议

  • 建议在结果页增加“手动刷新 / 手动轮询”按钮
  • 自动轮询和手动轮询应共享同一套状态更新逻辑
  • 超时后不要自动无限重试,应提示用户手动继续查询
  • 多个进行中任务应尽量对齐到统一节拍,而不是每个任务各自起定时器
  • 同一轮最多查询 100 个任务;超过 100 时拆成多个 batch,但保持同一轮询周期

状态判断

完成态:

  • succeeded

失败态:

  • failed
  • uncertain

处理中:

  • accepted
  • queued
  • running

视频任务

推荐节奏

  • 前几次可采用 10s -> 25s -> 25s
  • 之后固定 10s

超时控制

  • 普通任务:300s~600s
  • 高耗时任务:600s~1200s

状态判断

兼容判断以下完成态:

  • completed
  • succeeded
  • done

失败态:

  • failed
  • error
  • canceled
  • cancelled

不推荐做法

  • 每秒轮询一次
  • 仅依赖 progress == 100
  • 未拿到任务 ID 就开始轮询
  • 在异步生图里忽略 Retry-After
  • 多任务场景仍然逐个任务高频 GET /v1/tasks/{id}

参考代码

python
import time
import requests

base_url = "https://async.xinbao-ai.com"
first_wait = 50
interval = 10
deadline = time.time() + 1200
pending_ids = [task_id]
time.sleep(first_wait)

while time.time() < deadline and pending_ids:
    if len(pending_ids) == 1:
        response = requests.get(f"{base_url}/v1/tasks/{pending_ids[0]}", headers=headers, timeout=30)
        payload = response.json()
        status = payload.get("status", "")
        if status == "succeeded":
            pending_ids = []
            break
        if status in {"failed", "uncertain"}:
            raise RuntimeError(payload)
        retry_after = response.headers.get("Retry-After")
        server_wait = int(retry_after) if retry_after else 0
        wait_seconds = max(interval, server_wait)
    else:
        response = requests.post(
            f"{base_url}/v1/tasks/batch-get",
            headers={**headers, "Content-Type": "application/json"},
            json={"ids": pending_ids[:100]},
            timeout=30,
        )
        payload = response.json()
        next_poll_ms = payload.get("next_poll_after_ms", interval * 1000)
        next_pending = []
        for item in payload.get("items", []):
            status = item.get("status", "")
            if status in {"accepted", "queued", "running"}:
                next_pending.append(item["id"])
        pending_ids = next_pending
        wait_seconds = max(interval, max(1, int(next_poll_ms / 1000)))

    time.sleep(wait_seconds)

if pending_ids:
    raise TimeoutError("polling timed out after 1200s")

以 VitePress 构建,由 Cloudflare Pages 发布