切换主题
轮询策略
异步生图
推荐节奏
- 第一次轮询:提交后约
50s - 单任务后续轮询间隔:默认固定
10s - 多任务后续轮询间隔:默认固定
10s,并优先使用POST /v1/tasks/batch-get - 如果服务端返回更大的
Retry-After / next_poll_after_ms,优先遵守更大的值 - 客户端总超时:建议
1200s
客户端交互建议
- 建议在结果页增加“手动刷新 / 手动轮询”按钮
- 自动轮询和手动轮询应共享同一套状态更新逻辑
- 超时后不要自动无限重试,应提示用户手动继续查询
- 多个进行中任务应尽量对齐到统一节拍,而不是每个任务各自起定时器
- 同一轮最多查询
100个任务;超过100时拆成多个 batch,但保持同一轮询周期
状态判断
完成态:
succeeded
失败态:
faileduncertain
处理中:
acceptedqueuedrunning
视频任务
推荐节奏
- 前几次可采用
10s -> 25s -> 25s - 之后固定
10s
超时控制
- 普通任务:
300s~600s - 高耗时任务:
600s~1200s
状态判断
兼容判断以下完成态:
completedsucceededdone
失败态:
failederrorcanceledcancelled
不推荐做法
- 每秒轮询一次
- 仅依赖
progress == 100 - 未拿到任务 ID 就开始轮询
- 在异步生图里忽略
Retry-After - 多任务场景仍然逐个任务高频
GET /v1/tasks/{id}
参考代码
python
import time
import requests
base_url = "https://async.xinbao-ai.com"
first_wait = 50
interval = 10
deadline = time.time() + 1200
pending_ids = [task_id]
time.sleep(first_wait)
while time.time() < deadline and pending_ids:
if len(pending_ids) == 1:
response = requests.get(f"{base_url}/v1/tasks/{pending_ids[0]}", headers=headers, timeout=30)
payload = response.json()
status = payload.get("status", "")
if status == "succeeded":
pending_ids = []
break
if status in {"failed", "uncertain"}:
raise RuntimeError(payload)
retry_after = response.headers.get("Retry-After")
server_wait = int(retry_after) if retry_after else 0
wait_seconds = max(interval, server_wait)
else:
response = requests.post(
f"{base_url}/v1/tasks/batch-get",
headers={**headers, "Content-Type": "application/json"},
json={"ids": pending_ids[:100]},
timeout=30,
)
payload = response.json()
next_poll_ms = payload.get("next_poll_after_ms", interval * 1000)
next_pending = []
for item in payload.get("items", []):
status = item.get("status", "")
if status in {"accepted", "queued", "running"}:
next_pending.append(item["id"])
pending_ids = next_pending
wait_seconds = max(interval, max(1, int(next_poll_ms / 1000)))
time.sleep(wait_seconds)
if pending_ids:
raise TimeoutError("polling timed out after 1200s")