WebSocket grpc调用封装

背景

最近需要开发用于聊天通讯的SDK, 服务端提供的是grpc用于androidios的通讯, 在web上, grpc-web实质上是基于中间代理服务器通过http请求转发的, 而且也由于grpc-web将协议数据解析逻辑生成了一份js文件, 随着接口代码的逐渐增多, 用于解码数据的js文件大小也会越来越大, 不太符合现在的场景

调研

web层, 常用于rpc调用的方案是websocket, 考虑了一下可行性, 客户端和服务端保持websocket长链接, 通过sendMessage来实现rpc调用和返回, 服务端再将接受到的websocket数据转换为grpc调用方法, 调用后返回结果通过websocket返回给客户端, 所以最终确定使用websocket来实现grpc的调用

开发

开发一个稳定可用的websocket-client, 需要封装好几个关键的接口: 初始化/自动重连/错误处理/超时处理/发送数据/接收通知/数据解析

初始化

直接使用constructor实现, 将用于websocket的参数传入, 同时传入一些自定义参数, 如重连配置等, 同时将websocket的事件处理函数注册好, 这里注意, 如果接收到的message并不是客户端发起的, 意味着是服务端主动调用, 需要通过EventEmitter通知客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
export type WebsocketClientOptions = {
url: string;
rpcTimeout?: number;
reconnectTimes?: number;
reconnectInterval?: number;
reconnectIntervalIncrease?: number;
keepAliveInterval?: number;
};
export type WebsocketClientAllOptions = {
url: string;
rpcTimeout: number;
reconnectTimes: number;
reconnectInterval: number;
reconnectIntervalIncrease: number;
keepAliveInterval: number;
};

const defaultOptions: WebsocketClientAllOptions = {
url: '',
rpcTimeout: 10 * 1000,
reconnectTimes: 10,
reconnectInterval: 1 * 1000,
reconnectIntervalIncrease: 1 * 1000,
keepAliveInterval: 30 * 1000,
};
export class WebsocketClient implements IwebsocketClient {
private options: WebsocketClientAllOptions;
private _socketClient: WebSocket | null;
private eventBus: EventEmitter<Event>;
private _isConnected: boolean;
private _keepAliveTimer!: number;
private _rpcId: number;
private _rpcHandler: {
[key: string]: { resolve: (value: unknown) => void; reject: (error: ServerError) => void };
};
private _lockReconnectPromise: Promise<WebSocket | null> | null;
constructor(options: WebsocketClientOptions) {
this.options = { ...defaultOptions, ...options };
if (!this.options.url) {
throw new Error(`WebsocketClient need args url`);
}
this._socketClient = null;
this.eventBus = new EventEmitter();
this._isConnected = false;
this._rpcId = 1;
this._rpcHandler = {};
this._lockReconnectPromise = null;
this.initMessageHandler();
}
private initMessageHandler(): void {
this.eventBus.on(ClientEvent.MESSAGE, (data: Data) => {
getLogger().info(`Get message, rpcId: ${data.rpcid}`);
if (data.rpcid) {
if (this._rpcHandler[data.rpcid]) {
const { resolve, reject } = this._rpcHandler[data.rpcid];
Reflect.deleteProperty(this._rpcHandler, data.rpcid);
if (data.errorcode) {
return reject(new ServerError(data.errormsg, data.errorcode));
}
const result = data.payload ? JSON.parse(b64DecodeUnicode(data.payload)) : null;
return resolve(result ? result : null);
} else {
getLogger().error(`Not found rpcHandler: ${data.rpcid}`);
}
} else {
// no rpcid means server call
this.eventBus.emit(ClientEvent.SERVER_CALL, data);
}
});
}
}

连接/重连/心跳

由于js是单线程的, 可以很轻松的实现唯一重连锁, 将重连用promise实现, 并将此promise记录下来就行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
export class WebsocketClient implements IwebsocketClient {
isConnected(): boolean {
return this._isConnected;
}
private setSocketClient(socketClient: WebSocket) {
this._socketClient = socketClient;
this.addSocketEvent(this._socketClient);
}
async connect(): Promise<void> {
if (this._isConnected) {
throw new Error('Client is connected');
}
if (!this.options.url) {
throw new Error('Not found url');
}
const socketClient = await this._connect();
if (!socketClient) {
throw new Error('Connect websocket error');
}
this._isConnected = true;
this.setSocketClient(socketClient);
this.addKeepAlive();
}
private async _connect(): Promise<WebSocket | null> {
const socket = await new Promise<WebSocket | null>((resolve) => {
const socket = new WebSocket(this.options.url);
socket.addEventListener('open', () => {
resolve(socket);
});
socket.addEventListener('error', (event) => {
getLogger().warn(`connect websocket error`, event);
resolve(null);
});
});
if (socket) {
socket.addEventListener('close', () => {
getLogger().warn(`socket is closed`);
});
}
return socket;
}
private async sleep(milliSecond: number): Promise<void> {
return new Promise((r) => setTimeout(r, milliSecond));
}
private async _reconnect(): Promise<WebSocket | null> {
let socketClient: WebSocket | null = null;
for (let i = 0; i < this.options.reconnectTimes && this._isConnected; i++) {
socketClient = await this._connect();
if (!socketClient) {
await this.sleep(this.options.reconnectInterval + this.options.reconnectIntervalIncrease * i);
} else {
break;
}
}
return socketClient;
}
private async reconnect(): Promise<void> {
getLogger().warn(`target websocket reconnet`);
if (this._isConnected && !this._lockReconnectPromise) {
this.eventBus.emit(ClientEvent.RECONNECT, ClientReconnectStatus.START);
this._lockReconnectPromise = this._reconnect();
const socketClient = await this._lockReconnectPromise;
if (!socketClient) {
this.eventBus.emit(ClientEvent.RECONNECT, ClientReconnectStatus.FAIL);
} else {
this.eventBus.emit(ClientEvent.RECONNECT, ClientReconnectStatus.SUCCESS);
this.setSocketClient(socketClient);
}
this._lockReconnectPromise = null;
} else if (this._lockReconnectPromise) {
await this._lockReconnectPromise;
return;
}
}
private addSocketEvent(socketClient: WebSocket): void {
socketClient.addEventListener('message', async (event: MessageEvent) => {
const resultUInt8Array: Uint8Array = event.data instanceof Blob ? new Uint8Array(await event.data.arrayBuffer()) : event.data;
const resultText = new TextDecoder('utf-8').decode(resultUInt8Array);

this.eventBus.emit(Event.MESSAGE, JSON.parse(resultText) as Data);
});
}
private addKeepAlive() {
this._keepAliveTimer = window.setInterval(async () => {
try {
await this.send<request, response>({ type: 'ping' });
} catch (e) {
getLogger().warn(`Keepalive error`);
}
}, this.options.keepAliveInterval);
}
}

发送消息

发送前注册本地的消息回调处理函数, 并且根据超时时间设置回调, 如果没有按时返回则直接按超时进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
export class WebsocketClient implements IwebsocketClient {
private sendRequest(request: Request, mainCmd: number, subCmd: number): void {
if (!this._isConnected) {
throw new Error(`Websocket not connected`);
}
if (this._socketClient?.readyState !== WebSocket.OPEN) {
this.reconnect();
throw new ClientError(`websocket state error`, ErrorCode.REQUEST_TIMEOUT);
}
if (this._isConnected && this._socketClient) {
getLogger().info(`Request, rpcId: ${request.data.rpcid}`);

this._socketClient.send(TextEncoder.encode(JSON.stringify(request)));
}
}
private registerRpcIdHandler(rpcId: string, resolve: () => void, reject: () => void) {
this._rpcHandler[rpcId] = {
resolve,
reject,
};
}
async send<Q extends {}, T>(cmd: number, payload: Q = {} as Q): Promise<T> {
if (!this._isConnected) {
throw new Error(`Websocket not connected`);
}
return await new Promise<T>((resolve, reject) => {
const rpcId = this.getRpcId();
let isTimeout = false;
const timer = setTimeout(() => {
isTimeout = true;
this.reconnect();
reject(new ClientError(`Request Timeout`, ErrorCode.REQUEST_TIMEOUT));
}, this.options.rpcTimeout);
this.registerRpcIdHandler(
rpcId,
(value?: T) => {
if (!isTimeout) {
clearTimeout(timer);
resolve(value);
}
},
(error?: ClientError) => {
if (!isTimeout) {
clearTimeout(timer);
reject(error);
}
},
);
try {
this.sendRequest({
header: {
rpcid: rpcId,
},
body: {
cmd: cmd,
payload: b64EncodeUnicode(JSON.stringify(payload)),
},
});
} catch (e) {
clearTimeout(timer);
throw e;
}
});
}
}

事件处理

这里使用ts的函数重载实现的on方法, 可以在客户端使用指定事件的时候自动加上类型判断和检查

1
2
3
4
5
6
7
8
9
export class WebsocketClient implements IwebsocketClient {
on(event: ClientEvent.SERVER_CALL, callback: ({ data }: { data: Data }) => void): () => void;
on(event: ClientEvent.SERVER_ERROR, callback: (error: ClientError) => void): () => void;
on(event: ClientEvent.RECONNECT, callback: (status: ClientReconnectStatus) => void): () => void;
on(event: ClientEvent, callback: (data: any, mainCmd?: number) => void): () => void {
this.eventBus.on(event, callback);
return () => this.eventBus.off(event, callback);
}
}

使用

开发完成这个通用的websocketclient后, 使用起来就非常方便了, 而且是ts类型全兼容的

1
2
const result = await this.client.send<RequestData, ResponseData>(data);
return result.data;

其他

二进制传输与文本传输

最开始是直接使用的纯json文本传输, 但是后端为了安全性改为了base64后的二进制传输, 但是后来发现TextEncoder在某些androidwebview内的兼容性不好, 最终还是改为base64的文本传输

proto生成类型定义.ts

服务端提供的proto能生成客户端代码, 也能生成ts解析类型, 所以在想能不能直接生成我们需要的ts的类型定义, 不需要类型解析

研究了一番之后发现是可以直接用的, 只是有点hack. 先将proto转化为js代码

1
npx -p protobufjs -c "pbjs ./**/*.proto -p ./ -t static -w commonjs --keep-case --force-number -o proto_out.js"

上面这句命令能将proto文件生成一份js解析数据格式代码, 然后再使用官方提供的命令, 将proto转化为ts文件

1
npx -p protobufjs -c "pbts proto_out.js proto_out.ts"

这两步完成后, 就可以得到一份完整的protots类型定义, 但是里面会有很多class和格式解析的代码, 这些都是我们不需要的, 需要在生成的时候去掉

研究了源码之后, 发现在protobufjs源码里去掉一部分就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// change node_modules/protobufjs/cli/lib/tsd-jsdoc/publish.js
// at line: 509

// handles a class or class-like
function handleClass(element, parent) {
var is_interface = isInterface(element);
begin(element, is_interface);
if (is_interface)
write("interface ");
else {
- if (element.virtual)
- write("abstract ");
- write("class ");
+ return
}

将以上过程实现一个自动化脚本, 在脚本里将生成的proto.ts里加上格式化和eslint的部分注释, 就可以全自动根据proto生成ts类型了. 服务端再更新proto我们只需要重新跑一遍脚本, 就能获得最新的接口格式和数据定义, 也能看到具体diff

作者

Mosby

发布于

2020-07-23

许可协议

评论