某网站是一个 AI 对话平台,提供了类似 ChatGPT 的对话能力。通过抓包分析,发现其 API 使用 gRPC-Web 协议进行通信,于是用 Python 实现了一个完整的客户端,并提供了 OpenAI 协议兼容的 HTTP 服务。
gRPC-Web 简介 gRPC-Web 是 gRPC 在浏览器端的实现,允许 Web 应用直接调用 gRPC 服务,无需通过 HTTP/JSON 中间层。
为什么用 gRPC-Web 相比传统的 REST API,gRPC-Web 有以下优势:
特性
REST API
gRPC-Web
序列化
JSON (文本)
Protobuf (二进制)
数据大小
较大
较小 (节省 30-50%)
解析速度
慢
快
类型安全
弱
强 (Protobuf Schema)
流式支持
SSE/WebSocket
原生支持
gRPC-Web 协议格式 gRPC-Web 的请求体采用特殊的二进制格式:
1 2 3 4 5 +-------------------+----------------------+----------------------+ | 压缩标志 (1字节) | 消息长度 (4字节) | 消息体 (N字节) | | 0x00 = 未压缩 | 大端序整数 | JSON 或 Protobuf | | 0x01 = 压缩 | | | +-------------------+----------------------+----------------------+
示例: 假设消息体是 {"hello": "world"} (17 字节)
1 2 3 00 00 00 00 11 7B 22 68 65 6C 6C 6F 22 3A 20 22 77 6F 72 6C 64 22 7D ^^ ^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 未压缩 长度=17 {"hello": "world"}
gRPC-Web vs 原生 gRPC
方面
原生 gRPC
gRPC-Web
传输协议
HTTP/2
HTTP/1.1 或 HTTP/2
浏览器支持
需要代理
原生支持
双向流
支持
仅服务端流
压缩
内置
需手动处理
抓包分析过程 第一步:观察网络请求 打开浏览器访问 https://www.xxx.com,登录后打开开发者工具 (F12):
切换到 Network (网络) 标签
在输入框发送一条消息
观察出现的网络请求
发现了几个关键请求:
1 2 POST /api/agent/capy.agent.v1.AgentService/CreateChatSession POST /api/agent/capy.agent.v1.AgentService/ChatStream
从 URL 路径可以看出:
使用了 gRPC 服务 (capy.agent.v1.AgentService)
方法名采用 PascalCase (CreateChatSession, ChatStream)
第二步:分析请求头 查看 ChatStream 请求的 Request Headers:
1 2 3 4 5 6 7 8 9 POST /api/agent/capy.agent.v1.AgentService/ChatStream HTTP/1.1 Host : www.target.comContent-Type : application/connect+jsonconnect-protocol-version : 1oasis-appid : 10200oasis-language : zhoasis-platform : webx-waf-client-type : fetch_sdkCookie : Oasis-Webid=xxx; Oasis-Token=xxx
关键发现:
Content-Type: application/connect+json - 这是 gRPC-Web 的 Content-Type
connect-protocol-version: 1 - Connect 协议版本标识
oasis-appid / oasis-language / oasis-platform - 业务自定义头
第三步:解析请求体 在 Chrome DevTools 中,请求体显示为二进制:
1 2 3 4 5 6 7 8 9 00000000: 00 00 00 00 5f 7b 22 6d 65 73 73 61 67 65 22 3a ..._{"message": 00000010: 7b 22 63 68 61 74 53 65 73 73 69 6f 6e 49 64 22 {"chatSessionId" 00000020: 3a 22 78 78 78 22 2c 22 63 6f 6e 74 65 6e 74 22 :"xxx","content" 00000030: 3a 7b 22 75 73 65 72 4d 65 73 73 61 67 65 22 3a :{"userMessage": 00000040: 7b 22 71 61 22 3a 7b 22 63 6f 6e 74 65 6e 74 22 {"qa":{"content" 00000050: 3a 22 e4 bd a0 e5 a5 bd 5c 6e 22 7d 7d 7d 2c 22 :"你好\n"}}},"config" 00000060: 3a 7b 22 6d 6f 64 65 6c 22 3a 22 73 74 65 70 2d :{"model":"step- 00000070: 61 75 74 6f 22 2c 22 65 6e 61 62 6c 65 52 65 61 auto","enableRea 00000080: 73 6f 6e 69 6e 67 22 3a 74 72 75 65 7d 7d soning":true}}
解析结构:
00 - 压缩标志 (未压缩)
00 00 00 5f - 消息长度 (大端序,= 95 字节)
后续是 JSON 内容
手动提取 JSON (跳过前 5 字节):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "message" : { "chatSessionId" : "xxx" , "content" : { "userMessage" : { "qa" : { "content" : "你好\n" } } } } , "config" : { "model" : "step-auto" , "enableReasoning" : true } }
第四步:分析响应体 响应类型是 text/event-stream,即 SSE (Server-Sent Events):
1 2 3 4 5 6 7 data: {"data":{"event":{"startEvent":{"messageId":"msg_xxx"}}}} data: {"data":{"event":{"reasoningEvent":{"text":"用户问好..."}}}} data: {"data":{"event":{"textEvent":{"text":"你好!我是..."}}}} data: {"data":{"event":{"finishEvent":{}}}}
但实际上在 Network 面板看到的是原始字节:
1 2 3 4 00000000: 00 00 00 00 5f 7b 22 64 61 74 61 22 3a 7b 22 65 ..._{"data":{"e 00000010: 76 65 6e 74 22 3a 7b 22 73 74 61 72 74 45 76 65 vent":{"startEve 00000020: 6e 74 22 3a 7b 22 6d 65 73 73 61 67 65 49 64 22 nt":{"messageId" 00000030: 3a 22 6d 73 67 5f 78 78 78 22 7d 7d 7d 7d :"msg_xxx"}}}}
每个 SSE 事件都带有 5 字节前缀,格式与请求相同。
第五步:获取认证信息 在 Application > Cookies 中找到关键 Cookie:
Cookie
说明
Oasis-Token
访问令牌,格式:accessToken...refreshToken
Oasis-Webid
会话标识,UUID 格式
Oasis-Token 分析:
1 2 3 4 eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...<refreshToken> ^ ^ | | accessToken (JWT) 分隔符
JWT 解码后的 payload:
1 2 3 4 5 { "sub" : "user_123" , "exp" : 1740000000 , "iat" : 1739996400 }
API 端点总结 主要的 API 端点:
1 2 3 POST https://www.target.com/api/agent/capy.agent.v1.AgentService/CreateChatSession POST https://www.target.com/api/agent/capy.agent.v1.AgentService/ChatStream POST https://www.target.com/api/agent/capy.agent.v1.AgentService/ListChatSessions
请求格式 创建会话的请求体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "message" : { "chatSessionId" : "xxx" , "content" : { "userMessage" : { "qa" : { "content" : "你好\n" } } } } , "config" : { "model" : "step-auto" , "enableReasoning" : true } }
响应格式 SSE 流式响应,每个事件带有长度前缀:
1 2 3 [5字节长度前缀]{"data":{"event":{"startEvent":{...}}}} [5字节长度前缀]{"data":{"event":{"reasoningEvent":{"text":"..."}}}} [5字节长度前缀]{"data":{"event":{"textEvent":{"text":"..."}}}}
Python 客户端实现 构建二进制请求体 这是实现的关键部分。我们需要按照 gRPC-Web 格式组装请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def _build_grpc_web_body (payload: dict ) -> bytes : """ 构建 gRPC-Web 格式的请求体 格式: [压缩标志(1字节)][消息长度(4字节大端序)][消息体] """ json_str = json.dumps(payload, ensure_ascii=False ) json_bytes = json_str.encode('utf-8' ) message_length = len (json_bytes) data_bytes = bytes ([0 ]) + message_length.to_bytes(4 , 'big' ) + json_bytes return data_bytes
调试示例:
1 2 3 4 5 6 7 8 9 10 11 payload = {"hello" : "world" } result = _build_grpc_web_body(payload) print (result.hex ())
解析 SSE 响应流 响应流的解析比较复杂,需要处理多种事件类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def parse_sse_line (line: bytes ) -> dict : """ 解析 SSE 行,处理长度前缀 输入示例: b'\x00\x00\x00\x1a{"data":{"event":{...}}}' """ offset = 0 if offset + 5 > len (line): raise ValueError("Invalid line format" ) if line[offset] != 0 : raise ValueError(f"Unsupported compression flag: {line[offset]} " ) offset += 1 msg_len = int .from_bytes(line[offset:offset+4 ], 'big' ) offset += 4 if offset + msg_len > len (line): raise ValueError("Message length mismatch" ) json_bytes = line[offset:offset+msg_len] json_str = json_bytes.decode('utf-8' ) return json.loads(json_str)
事件类型映射:
事件类型
说明
示例数据
startEvent
对话开始
{"messageId": "msg_xxx"}
reasoningEvent
AI 思考过程
{"text": "用户问好..."}
textEvent
正式回答
{"text": "你好!我是..."}
finishEvent
对话结束
{}
errorEvent
错误信息
{"code": 500, "message": "..."}
核心 API 类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from dataclasses import dataclassfrom typing import Iterator, Optional , Dict , Union , Literal @dataclass class StreamChunk : """流式响应块""" type : Literal ["start" , "reasoning" , "content" , "end" , "error" ] data: Union [str , Dict ] message_id: Optional [str ] = None class TargetAPI : """ API 客户端""" BASE_URL = "https://www.target.com/api/agent" SERVICE = "capy.agent.v1.AgentService" def __init__ (self, oasis_token: str , oasis_webid: str , oasis_appid: str = "10200" ): self.oasis_token = oasis_token self.oasis_webid = oasis_webid self.oasis_appid = oasis_appid self.session = requests.Session() self._setup_headers() def _setup_headers (self ): """设置请求头""" self.session.headers.update({ "User-Agent" : "Mozilla/5.0 ..." , "connect-protocol-version" : "1" , "oasis-appid" : self.oasis_appid, "oasis-language" : "zh" , "oasis-platform" : "web" , "x-waf-client-type" : "fetch_sdk" , }) cookie = f"Oasis-Webid={self.oasis_webid} ; Oasis-Token={self.oasis_token} " self.session.headers["Cookie" ] = cookie
流式对话实现 完整的流式对话处理流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 def chat_stream ( self, chat_session_id: str , content: str , model: str = "step-auto" , enable_reasoning: bool = True , ) -> Iterator[StreamChunk]: """ 流式对话 处理流程: 1. 构建 gRPC-Web 格式请求体 2. 发送 POST 请求 3. 逐行解析 SSE 响应 4. 根据事件类型生成 StreamChunk """ url = f"{self.BASE_URL} /{self.SERVICE} /ChatStream" headers = {**self.session.headers, "content-type" : "application/connect+json" } payload = { "message" : { "chatSessionId" : chat_session_id, "content" : { "userMessage" : { "qa" : {"content" : content + "\n" } } } }, "config" : { "model" : model, "enableReasoning" : enable_reasoning } } json_bytes = json.dumps(payload, ensure_ascii=False ).encode('utf-8' ) data_bytes = bytes ([0 ]) + len (json_bytes).to_bytes(4 , 'big' ) + json_bytes response = self.session.post(url, headers=headers, data=data_bytes, stream=True ) response.raise_for_status() for line in response.iter_lines(): if not line: continue try : if len (line) > 5 : json_bytes = line[5 :] json_str = json_bytes.decode('utf-8' ) data = json.loads(json_str) yield from self._process_event(data) except (json.JSONDecodeError, UnicodeDecodeError): continue
事件处理器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def _process_event (self, data: Dict ) -> Iterator[StreamChunk]: """处理SSE事件数据,根据事件类型生成对应的 StreamChunk""" event_data = data.get("data" , {}) event = event_data.get("event" , {}) if "startEvent" in event: start = event["startEvent" ] yield StreamChunk( type ="start" , data=start, message_id=start.get("messageId" ) ) elif "reasoningEvent" in event: text = event["reasoningEvent" ].get("text" , "" ) if text: yield StreamChunk(type ="reasoning" , data={"text" : text}) elif "message" in event: msg = event["message" ] text = msg.get("content" , {}).get("assistantMessage" , {}).get("qa" , {}).get("content" , "" ) if text: yield StreamChunk(type ="content" , data={"text" : text}, message_id=msg.get("messageId" )) elif "textEvent" in event: text = event["textEvent" ].get("text" , "" ) if text: yield StreamChunk(type ="content" , data={"text" : text}) elif "doneEvent" in event or "messageDoneEvent" in event or "finishEvent" in event: yield StreamChunk(type ="end" , data={})
Token 刷新机制 Oasis-Token 有过期时间,需要自动刷新。Token 格式为 accessToken...refreshToken。
JWT 解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import base64import jsonimport timedef _check_token_expired (self ) -> bool : """ 检查 Token 是否过期 JWT 格式: header.payload.signature 我们只需要解析 payload 部分 """ try : parts = self.oasis_token.split("..." ) if len (parts) < 2 : return False access_token = parts[0 ] payload_b64 = access_token.split("." )[1 ] padding = 4 - len (payload_b64) % 4 payload_b64 += "=" * padding payload_bytes = base64.b64decode(payload_b64) payload = json.loads(payload_bytes.decode('utf-8' )) exp = payload.get("exp" , 0 ) current_time = int (time.time()) is_expired = exp < current_time + 60 if is_expired: logger.warning(f"Token 即将过期: exp={exp} , current={current_time} " ) return is_expired except Exception as e: logger.error(f"检查 Token 过期失败: {e} " ) return False
JWT Payload 示例:
1 2 3 4 5 6 { "sub" : "user_123456" , "exp" : 1740585600 , "iat" : 1740582000 , "iss" : "target.com" }
Token 刷新请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def _refresh_token (self ) -> None : """ 刷新 Token 调用刷新接口,获取新的 accessToken 和 refreshToken """ try : logger.info("开始刷新 Token..." ) url = "https://www.target.com/passport/proto.api.passport.v1.PassportService/RefreshToken" headers = { **self.session.headers, "content-type" : "application/json" , } response = requests.post(url, headers=headers, json={}) response.raise_for_status() data = response.json() access_token = data.get("accessToken" , {}).get("raw" , "" ) refresh_token = data.get("refreshToken" , {}).get("raw" , "" ) if not access_token or not refresh_token: raise ValueError("刷新 Token 响应格式错误" ) new_token = f"{access_token} ...{refresh_token} " logger.info(f"Token 刷新成功" ) self.oasis_token = new_token self._setup_headers() if self.on_token_refresh: self.on_token_refresh(new_token) except Exception as e: logger.error(f"刷新 Token 失败: {e} " ) raise
自动刷新机制 在每次 API 调用前检查 Token 是否过期:
1 2 3 4 5 6 7 8 9 10 11 12 13 def _ensure_token_valid (self ) -> None : """ 确保 Token 有效,如果即将过期则自动刷新 """ if self._check_token_expired(): self._refresh_token() def chat_stream (self, chat_session_id: str , content: str , model: str = "step-auto" ): """流式对话 - 调用前检查 Token""" self._ensure_token_valid() url = f"{self.BASE_URL} /{self.SERVICE} /ChatStream"
OpenAI 兼容接口 为了方便集成,实现 OpenAI 协议兼容接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @dataclass class OpenAIResponse : id : str object : str created: int model: str choices: List class OpenAICompatibleAPI : def create_chat_completion (self, model, messages, stream=False ): """OpenAI 兼容的聊天补全""" chat_session_id = self._get_or_create_session() content = messages[-1 ]["content" ] if stream: return self._chat_completion_stream(chat_session_id, content, model) else : return self._chat_completion_sync(chat_session_id, content, model)
HTTP API Server 基于 FastAPI 提供 HTTP 服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from fastapi import FastAPIfrom fastapi.responses import StreamingResponseapp = FastAPI(title="Target API Server" ) @app.post("/v1/chat/completions" ) async def create_chat_completion (request: ChatCompletionRequest ): """OpenAI 兼容的聊天补全接口""" if request.stream: async def stream_generator (): for chunk in api_client.chat_stream(...): yield f"data: {json.dumps(chunk)} \n\n" yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream" ) response = openai_api.create_chat_completion(...) return openai_api.to_openai_dict(response)
启动服务:
1 2 3 python target_api_server.py uvicorn target_api_server:app --host 0.0.0.0 --port 8000
使用示例 直接使用客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from target_api import TargetAPIapi = TargetAPI( oasis_token="your_token" , oasis_webid="your_webid" , ) session = api.create_chat_session() for chunk in api.chat_stream( chat_session_id=session.chat_session_id, content="你好,请介绍一下自己" , model="step-auto" , enable_reasoning=True , ): if chunk.type == "reasoning" : print (f"[思考] {chunk.data['text' ]} " , end="" , flush=True ) elif chunk.type == "content" : print (chunk.data['text' ], end="" , flush=True ) elif chunk.type == "end" : print ()
使用 OpenAI SDK 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from openai import OpenAIclient = OpenAI( base_url="http://localhost:8000/v1" , api_key="unused" , ) response = client.chat.completions.create( model="step-auto" , messages=[{"role" : "user" , "content" : "你好" }], stream=True , ) for chunk in response: content = chunk.choices[0 ].delta.content if content: print (content, end="" , flush=True )
使用 curl 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 curl http://localhost:8000/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "step-auto", "messages": [{"role": "user", "content": "你好"}] }' curl http://localhost:8000/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "step-auto", "messages": [{"role": "user", "content": "你好"}], "stream": true }'
Reference