Egg 使用迭代器实现分段下载

背景

后台管理项目中最常见的需求就是导出数据了, 在C#Java等框架中, 因为数据库请求和文件io基本都是同步操作, 所以可以简单的用while来实现数据的分段导出, 而在nodejs中, 因为异步callbackpromise的写法问题, 都会导致代码非常难写, 想起ES6里有Generator函数, 可以用它来实现下载场景

实现

Generator异步函数

如果我们需要从一个有状态的地方, 或者一个一直写入的流里去获取数据, 一直到获取完成, 那么使用Generator会是比较优雅的方案, 而从无状态的数据源获取数据, 可以很简单的使用循环来实现

模拟有状态获取数据

  • 下面方法执行后, 可以看到curllog是按时间实时输出的, 很清晰的反应了分段下载的过程
  • ctx.runInBackground是官方推荐的在后台执行任务的方法, 会自动记录异常, 这里需要用到它是为了避免阻塞中间件, 可以让日志记录中间件实时记录响应返回的状态, 而不是在整体下载完成之后才记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class ApiService extends Service {
async apiGeneratorExportFile() {
const { ctx } = this;
ctx.logger.info('In apiGeneratorExportFile');
ctx.response.attachment('GeneratorExportFile.txt');
ctx.response.set('Transfer-Encoding', 'chunked');
ctx.response.set('Content-Type', 'text/plain; charset=utf-8');
ctx.response.set('X-Content-Type-Options', 'nosniff');
ctx.status = 200;
ctx.body = new stream.PassThrough();

ctx.runInBackground(async () => {
const bodyStream = ctx.body;
try {
for await (const data of this.getDataGen(bodyStream)) {
if (!bodyStream.destoryed && bodyStream.writable) {
// 加上cork是为了去掉服务端缓冲, 避免客户端打印不及时
bodyStream.cork();
bodyStream.write(data);
bodyStream.uncork();
}
}
} catch (e) {
ctx.logger.error(e);
} finally {
if (!bodyStream.destoryed && bodyStream.writable) {
bodyStream.end('');
}
}
});
}

async *getDataGen(bodyStream) {
try {
let data = 'Start';
const startTime = new Date().getTime();
while (data !== 'End' && !bodyStream.destoryed && bodyStream.writable) {
data = await this.getDataSleep(startTime);
this.ctx.logger.info(`get data ${data}`);
yield data;
}
} catch (e) {
this.ctx.logger.error(`get data fail, ${_.toString(e)}`);
yield `get data fail, ${_.toString(e)}`;
}
}

sleep(time) {
return new Promise((r) => setTimeout(r, time));
}

async getDataSleep(startTime) {
await this.sleep(_.random(3, 5, false) * 1000);
if (Math.random() < 0.05) {
throw new Error(`Random error`);
}
if (Math.random() < 0.1) {
return 'End';
}
return `Get data success at ${new Date().getTime() - startTime}ms\n`;
}
}

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 下载到一半的时候出错的情况
curl -vv http://127.0.0.1:7001/api/v1/generator-export-file
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 7001 (#0)
> GET /api/v1/generator-export-file HTTP/1.1
> Host: 127.0.0.1:7001
> User-Agent: curl
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=utf-8
< Content-Disposition: attachment; filename="GeneratorExportFile.txt"
< Transfer-Encoding: chunked
< x-content-type-options: nosniff
< x-frame-options: SAMEORIGIN
< x-xss-protection: 1; mode=block
< x-download-options: noopen
< x-readtime: 11
< Connection: keep-alive
< Keep-Alive: timeout=5
<
Get data success at 4007ms
Get data success at 9016ms
Get data success at 14020ms
Get data success at 19033ms
Get data success at 24042ms
Get data success at 28045ms
Get data success at 33047ms
Get data success at 38054ms
Get data success at 41066ms
Get data success at 46077ms
get data fail, Error: Random error* Connection #0 to host 127.0.0.1 left intact

# 下载成功完成
curl -vv http://127.0.0.1:7001/api/v1/generator-export-file
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 7001 (#0)
> GET /api/v1/generator-export-file HTTP/1.1
> Host: 127.0.0.1:7001
> User-Agent: curl
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=utf-8
< Content-Disposition: attachment; filename="GeneratorExportFile.txt"
< Transfer-Encoding: chunked
< x-content-type-options: nosniff
< x-frame-options: SAMEORIGIN
< x-xss-protection: 1; mode=block
< x-download-options: noopen
< x-readtime: 3
< Connection: keep-alive
< Keep-Alive: timeout=5
<
Get data success at 5003ms
Get data success at 8020ms
Get data success at 11033ms
Get data success at 16037ms
Get data success at 20044ms
Get data success at 23051ms
Get data success at 27060ms
Get data success at 30072ms
Get data success at 34087ms
Get data success at 39103ms
Get data success at 43105ms
Get data success at 48122ms
Get data success at 52129ms
Get data success at 57132ms
Get data success at 60146ms
Get data success at 63150ms
Get data success at 68165ms
Get data success at 71167ms
Get data success at 76178ms
End* Connection #0 to host 127.0.0.1 left intact

模拟无状态获取数据

  • 基本流程和有状态一样, 只是将Generator函数换成了循环调用执行
  • ctx.runInBackground同样是为了避免阻塞中间件的返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ApiService extends Service {
async apiLoopExportFile() {
const { ctx } = this;
ctx.logger.info('In apiLoopExportFile');
ctx.response.attachment('LoopExportFile.txt');
ctx.response.set('Transfer-Encoding', 'chunked');
ctx.response.set('Content-Type', 'text/plain; charset=utf-8');
ctx.response.set('X-Content-Type-Options', 'nosniff');
ctx.status = 200;
ctx.body = new stream.PassThrough();

ctx.runInBackground(async () => {
const bodyStream = ctx.body;
try {
let data = 'Start';
const startTime = new Date().getTime();
bodyStream.cork();
bodyStream.write(data);
bodyStream.uncork();
do {
data = await this.getDataSleep(startTime);
if (!bodyStream.destoryed && bodyStream.writable) {
bodyStream.cork();
bodyStream.write(data);
bodyStream.uncork();
}
} while (data !== 'End');
} catch (e) {
ctx.logger.error(e);
} finally {
if (!bodyStream.destoryed && bodyStream.writable) {
bodyStream.end('');
}
}
});
}
}

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 下载到一半的时候出错的情况
curl -vv http://127.0.0.1:7001/api/v1/loop-export-file
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 7001 (#0)
> GET /api/v1/loop-export-file HTTP/1.1
> Host: 127.0.0.1:7001
> User-Agent: curl
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=utf-8
< Content-Disposition: attachment; filename="LoopExportFile.txt"
< Transfer-Encoding: chunked
< x-content-type-options: nosniff
< x-frame-options: SAMEORIGIN
< x-xss-protection: 1; mode=block
< x-download-options: noopen
< x-readtime: 10
< Connection: keep-alive
< Keep-Alive: timeout=5
<
StartGet data success at 3006ms
Get data success at 7008ms
Get data success at 12021ms
Get data success at 15036ms
Get data success at 18049ms
get data fail, Error: Random error* Connection #0 to host 127.0.0.1 left intact

# 下载成功完成
curl -vv http://127.0.0.1:7001/api/v1/loop-export-file
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 7001 (#0)
> GET /api/v1/loop-export-file HTTP/1.1
> Host: 127.0.0.1:7001
> User-Agent: curl
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=utf-8
< Content-Disposition: attachment; filename="LoopExportFile.txt"
< Transfer-Encoding: chunked
< x-content-type-options: nosniff
< x-frame-options: SAMEORIGIN
< x-xss-protection: 1; mode=block
< x-download-options: noopen
< x-readtime: 11
< Connection: keep-alive
< Keep-Alive: timeout=5
<
StartGet data success at 5007ms
Get data success at 8009ms
Get data success at 13018ms
Get data success at 17020ms
Get data success at 20034ms
Get data success at 24042ms
End* Connection #0 to host 127.0.0.1 left intact
作者

Mosby

发布于

2018-09-17

许可协议

评论