背景
最近需要开发用于聊天通讯的SDK
, 服务端提供的是grpc
用于android
和ios
的通讯, 在web
上, grpc-web
实质上是基于中间代理服务器通过http
请求转发的, 而且也由于grpc-web
将协议数据解析逻辑生成了一份js
文件, 随着接口代码的逐渐增多, 用于解码数据的js
文件大小也会越来越大, 不太符合现在的场景
调研
在web
层, 常用于rpc
调用的方案是websocket
, 考虑了一下可行性, 客户端和服务端保持websocket
长链接, 通过sendMessage
来实现rpc
调用和返回, 服务端再将接受到的websocket
数据转换为grpc
调用方法, 调用后返回结果通过websocket
返回给客户端, 所以最终确定使用websocket
来实现grpc
的调用
开发
开发一个稳定可用的websocket-client
, 需要封装好几个关键的接口: 初始化/自动重连/错误处理/超时处理/发送数据/接收通知/数据解析
初始化
直接使用constructor
实现, 将用于websocket
的参数传入, 同时传入一些自定义参数, 如重连配置等, 同时将websocket
的事件处理函数注册好, 这里注意, 如果接收到的message
并不是客户端发起的, 意味着是服务端主动调用, 需要通过EventEmitter
通知客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| export type WebsocketClientOptions = { url: string; rpcTimeout?: number; reconnectTimes?: number; reconnectInterval?: number; reconnectIntervalIncrease?: number; keepAliveInterval?: number; }; export type WebsocketClientAllOptions = { url: string; rpcTimeout: number; reconnectTimes: number; reconnectInterval: number; reconnectIntervalIncrease: number; keepAliveInterval: number; };
const defaultOptions: WebsocketClientAllOptions = { url: '', rpcTimeout: 10 * 1000, reconnectTimes: 10, reconnectInterval: 1 * 1000, reconnectIntervalIncrease: 1 * 1000, keepAliveInterval: 30 * 1000, }; export class WebsocketClient implements IwebsocketClient { private options: WebsocketClientAllOptions; private _socketClient: WebSocket | null; private eventBus: EventEmitter<Event>; private _isConnected: boolean; private _keepAliveTimer!: number; private _rpcId: number; private _rpcHandler: { [key: string]: { resolve: (value: unknown) => void; reject: (error: ServerError) => void }; }; private _lockReconnectPromise: Promise<WebSocket | null> | null; constructor(options: WebsocketClientOptions) { this.options = { ...defaultOptions, ...options }; if (!this.options.url) { throw new Error(`WebsocketClient need args url`); } this._socketClient = null; this.eventBus = new EventEmitter(); this._isConnected = false; this._rpcId = 1; this._rpcHandler = {}; this._lockReconnectPromise = null; this.initMessageHandler(); } private initMessageHandler(): void { this.eventBus.on(ClientEvent.MESSAGE, (data: Data) => { getLogger().info(`Get message, rpcId: ${data.rpcid}`); if (data.rpcid) { if (this._rpcHandler[data.rpcid]) { const { resolve, reject } = this._rpcHandler[data.rpcid]; Reflect.deleteProperty(this._rpcHandler, data.rpcid); if (data.errorcode) { return reject(new ServerError(data.errormsg, data.errorcode)); } const result = data.payload ? JSON.parse(b64DecodeUnicode(data.payload)) : null; return resolve(result ? result : null); } else { getLogger().error(`Not found rpcHandler: ${data.rpcid}`); } } else { this.eventBus.emit(ClientEvent.SERVER_CALL, data); } }); } }
|
连接/重连/心跳
由于js
是单线程的, 可以很轻松的实现唯一重连锁, 将重连用promise
实现, 并将此promise
记录下来就行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| export class WebsocketClient implements IwebsocketClient { isConnected(): boolean { return this._isConnected; } private setSocketClient(socketClient: WebSocket) { this._socketClient = socketClient; this.addSocketEvent(this._socketClient); } async connect(): Promise<void> { if (this._isConnected) { throw new Error('Client is connected'); } if (!this.options.url) { throw new Error('Not found url'); } const socketClient = await this._connect(); if (!socketClient) { throw new Error('Connect websocket error'); } this._isConnected = true; this.setSocketClient(socketClient); this.addKeepAlive(); } private async _connect(): Promise<WebSocket | null> { const socket = await new Promise<WebSocket | null>((resolve) => { const socket = new WebSocket(this.options.url); socket.addEventListener('open', () => { resolve(socket); }); socket.addEventListener('error', (event) => { getLogger().warn(`connect websocket error`, event); resolve(null); }); }); if (socket) { socket.addEventListener('close', () => { getLogger().warn(`socket is closed`); }); } return socket; } private async sleep(milliSecond: number): Promise<void> { return new Promise((r) => setTimeout(r, milliSecond)); } private async _reconnect(): Promise<WebSocket | null> { let socketClient: WebSocket | null = null; for (let i = 0; i < this.options.reconnectTimes && this._isConnected; i++) { socketClient = await this._connect(); if (!socketClient) { await this.sleep(this.options.reconnectInterval + this.options.reconnectIntervalIncrease * i); } else { break; } } return socketClient; } private async reconnect(): Promise<void> { getLogger().warn(`target websocket reconnet`); if (this._isConnected && !this._lockReconnectPromise) { this.eventBus.emit(ClientEvent.RECONNECT, ClientReconnectStatus.START); this._lockReconnectPromise = this._reconnect(); const socketClient = await this._lockReconnectPromise; if (!socketClient) { this.eventBus.emit(ClientEvent.RECONNECT, ClientReconnectStatus.FAIL); } else { this.eventBus.emit(ClientEvent.RECONNECT, ClientReconnectStatus.SUCCESS); this.setSocketClient(socketClient); } this._lockReconnectPromise = null; } else if (this._lockReconnectPromise) { await this._lockReconnectPromise; return; } } private addSocketEvent(socketClient: WebSocket): void { socketClient.addEventListener('message', async (event: MessageEvent) => { const resultUInt8Array: Uint8Array = event.data instanceof Blob ? new Uint8Array(await event.data.arrayBuffer()) : event.data; const resultText = new TextDecoder('utf-8').decode(resultUInt8Array);
this.eventBus.emit(Event.MESSAGE, JSON.parse(resultText) as Data); }); } private addKeepAlive() { this._keepAliveTimer = window.setInterval(async () => { try { await this.send<request, response>({ type: 'ping' }); } catch (e) { getLogger().warn(`Keepalive error`); } }, this.options.keepAliveInterval); } }
|
发送消息
发送前注册本地的消息回调处理函数, 并且根据超时时间设置回调, 如果没有按时返回则直接按超时进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| export class WebsocketClient implements IwebsocketClient { private sendRequest(request: Request, mainCmd: number, subCmd: number): void { if (!this._isConnected) { throw new Error(`Websocket not connected`); } if (this._socketClient?.readyState !== WebSocket.OPEN) { this.reconnect(); throw new ClientError(`websocket state error`, ErrorCode.REQUEST_TIMEOUT); } if (this._isConnected && this._socketClient) { getLogger().info(`Request, rpcId: ${request.data.rpcid}`);
this._socketClient.send(TextEncoder.encode(JSON.stringify(request))); } } private registerRpcIdHandler(rpcId: string, resolve: () => void, reject: () => void) { this._rpcHandler[rpcId] = { resolve, reject, }; } async send<Q extends {}, T>(cmd: number, payload: Q = {} as Q): Promise<T> { if (!this._isConnected) { throw new Error(`Websocket not connected`); } return await new Promise<T>((resolve, reject) => { const rpcId = this.getRpcId(); let isTimeout = false; const timer = setTimeout(() => { isTimeout = true; this.reconnect(); reject(new ClientError(`Request Timeout`, ErrorCode.REQUEST_TIMEOUT)); }, this.options.rpcTimeout); this.registerRpcIdHandler( rpcId, (value?: T) => { if (!isTimeout) { clearTimeout(timer); resolve(value); } }, (error?: ClientError) => { if (!isTimeout) { clearTimeout(timer); reject(error); } }, ); try { this.sendRequest({ header: { rpcid: rpcId, }, body: { cmd: cmd, payload: b64EncodeUnicode(JSON.stringify(payload)), }, }); } catch (e) { clearTimeout(timer); throw e; } }); } }
|
事件处理
这里使用ts
的函数重载实现的on
方法, 可以在客户端使用指定事件的时候自动加上类型判断和检查
1 2 3 4 5 6 7 8 9
| export class WebsocketClient implements IwebsocketClient { on(event: ClientEvent.SERVER_CALL, callback: ({ data }: { data: Data }) => void): () => void; on(event: ClientEvent.SERVER_ERROR, callback: (error: ClientError) => void): () => void; on(event: ClientEvent.RECONNECT, callback: (status: ClientReconnectStatus) => void): () => void; on(event: ClientEvent, callback: (data: any, mainCmd?: number) => void): () => void { this.eventBus.on(event, callback); return () => this.eventBus.off(event, callback); } }
|
使用
开发完成这个通用的websocketclient
后, 使用起来就非常方便了, 而且是ts
类型全兼容的
1 2
| const result = await this.client.send<RequestData, ResponseData>(data); return result.data;
|
其他
二进制传输与文本传输
最开始是直接使用的纯json
文本传输, 但是后端为了安全性改为了base64
后的二进制传输, 但是后来发现TextEncoder
在某些android
的webview
内的兼容性不好, 最终还是改为base64
的文本传输
proto
生成类型定义.ts
服务端提供的proto
能生成客户端代码, 也能生成ts
解析类型, 所以在想能不能直接生成我们需要的ts
的类型定义, 不需要类型解析
研究了一番之后发现是可以直接用的, 只是有点hack
. 先将proto
转化为js
代码
1
| npx -p protobufjs -c "pbjs ./**/*.proto -p ./ -t static -w commonjs --keep-case --force-number -o proto_out.js"
|
上面这句命令能将proto
文件生成一份js
解析数据格式代码, 然后再使用官方提供的命令, 将proto
转化为ts
文件
1
| npx -p protobufjs -c "pbts proto_out.js proto_out.ts"
|
这两步完成后, 就可以得到一份完整的proto
的ts
类型定义, 但是里面会有很多class
和格式解析的代码, 这些都是我们不需要的, 需要在生成的时候去掉
研究了源码之后, 发现在protobufjs
源码里去掉一部分就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
function handleClass(element, parent) { var is_interface = isInterface(element); begin(element, is_interface); if (is_interface) write("interface "); else { - if (element.virtual) - write("abstract "); - write("class "); + return }
|
将以上过程实现一个自动化脚本, 在脚本里将生成的proto.ts
里加上格式化和eslint
的部分注释, 就可以全自动根据proto
生成ts
类型了. 服务端再更新proto
我们只需要重新跑一遍脚本, 就能获得最新的接口格式和数据定义, 也能看到具体diff