Eggjs使用agent实现pub/sub配置同步

背景

开发项目过程中一般需要维护一个统一的配置缓存, 以及监听配置缓存变动, 在Java等语言中可以使用全局单例模式实现, 但是在Node中因为每个进程是独立的运行环境, 所以需要有个其他的解决方案, 下面来看看在Egg中如何优雅的实现配置同步

原理

Egg内置实现的进程模型分为三个角色: MasterAgentWorker, 而其中每个Worker自己的运行环境是独立的, 而且是不稳定的, 如果由每个Worker自己订阅配置和变更, 消耗会比较高, 所以Egg官方更推荐的是在Agent里处理

Master VS Agent VS Worker

当一个应用启动时,会同时启动这三类进程。

类型 进程数量 作用 稳定性 是否运行业务代码
Master 1 进程管理,进程间消息转发 非常高
Agent 1 后台运行工作(长连接客户端) 少量
Worker 一般设置为 CPU 核数 执行业务代码 一般

架构设计

sequenceDiagram participant C as ConfigManager participant R as Redis participant A as Agent participant W as Worker alt 首次启动 C -->> R: 0. 写入原始配置 R ->> A: 1. Agent启动, subscribe redis A ->> W: 2. Worker启动, 监听Agent下发事件 R ->> W: 3. Worker拉取redis最新配置 W -->> W: 4. Worker将配置写入本地 end alt 配置更新 C ->> R: 5. config修改, 写入redis C ->> R: 6. redis.publish(), 发送消息 R ->> A: 7. redis通知Agent R -->> C: 8. redis返回接收到的客户端数量 R ->> A: 9. Agent从redis获取最新配置 A ->> W: 10. Agent通知Worker更新配置 W -->> W: 11. Worker加载最新配置 end

开发

由于Egg官网提示, 需要等到Worker启动成功后才能使用进程间通信, 所以在app.js启动时候需要主动直接去Redis里获取一份当前最新配置, 同时订阅Agent下发的事件

如果在worker启动的时候通过messenger通信是能成功的, 但是会出现如下warning:
agent can't call sendTo before server started

app.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
'use strict';

class AppBootHook {
constructor(app) {
this.app = app;
console.log(`app pid: ${process.pid}`);
}
async willReady() {
this.app.messenger.on('push_config', (data) => {
this.logger.info(`Received push_config data: [${JSON.stringify(data)}]`);
this.app.customConfig = data.config;
});
this.app.customConfig = JSON.parse(await this.app.redis.get('CONFIG'));
}
async didReady() {}
async serverDidReady() {}
}

module.exports = AppBootHook;

agent.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
'use strict';

class AgentHook {
constructor(agent) {
this.agent = agent;
console.log(`agent pid: ${process.pid}`);
}
configWillLoad() {}
configDidLoad() {}
async willReady() {
await new Promise((resolve, reject) => {
const client = this.agent.redis.duplicate();
client.on('message', async (channel, message) => {
this.logger.info(`Received subscribe message, channel [${channel}] message [${message}]`);
this.agent.messenger.sendToApp('push_config', { config: JSON.parse(await this.agent.redis.get('CONFIG')) });
});
client.on('ready', () => {
client.subscribe(this.agent.config.redisConfigChannel, (err, count) => {
this.logger.info(`Subscribe channel ${this.agent.config.redisConfigChannel}, error: ${err}, count: ${count}.`);
});
this.logger.info('Redis is connected and ready use.');
resolve();
});
client.on('error', (err) => {
this.logger.error(`Redis is error. Error: ${err}`);
reject(err);
});
});
}
async didReady() {}
async serverDidReady() {}
}

module.exports = AgentHook;

Eggjs使用agent实现pub/sub配置同步

https://mosby-zhou.github.io/2018/12-26-egg-agent-redis-sub/

作者

Mosby

发布于

2018-12-26

许可协议

评论