Nodejs 流(stream) Readable、Writable

可读流 + 可写流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const fs = require('fs');
const path = require('path');

let rs = fs.createReadStream(path.resolve(__dirname, '1.txt'),{
highWaterMark: 4
})

let ws = fs.createWriteStream(path.resolve(__dirname, '2.txt'), {
highWaterMark: 1
})

rs.on('data', function(chunk){
let flag = ws.write(chunk);
if( !flag){
rs.pause(); // 暂停读流
}
})
ws.on('drain', function(){
console.log('干了');
rs.resume();
})

现在换成自己写的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const fs = require('fs');
const path = require('path');

const ReadStream = require('./ReadStream');
const WriteStream = require('./WriteStream');

let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
highWaterMark: 4
})

let ws = new WriteStream(path.resolve(__dirname, '2.txt'), {
highWaterMark: 1
})

rs.on('data', function(chunk){
let flag = ws.write(chunk);
if( !flag){
rs.pause(); // 暂停读流
}
})
ws.on('drain', function(){
console.log('干了');
rs.resume();
})

这样也可以运行成功 上面line15-24还是太麻烦了,原生有封装

pipe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const fs = require('fs');
const path = require('path');

const ReadStream = require('./ReadStream');
const WriteStream = require('./WriteStream');

let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
// highWaterMark: 4 // 这里也可以注释掉
})

let ws = new WriteStream(path.resolve(__dirname, '2.txt'), {
// highWaterMark: 1 // 这里也可以注释掉
})
rs.pipe(ws); // 默认会调用可写流的write方法 最终会调用end方法
// 同理 现在要在ReadStream.js中增加pipe方法
1
2
3
4
5
6
7
8
9
10
11
12
// ReadStream.js
pipe(ws){
this.on('data', function(chunk){
let flag = ws.write(chunk);
if( !flag){
this.pause();
}
})
ws.on('drain',()=>{
this.resume();
})
}

分析

上面都是基于文件的流
自己可以实现自己的流
http socket 压缩 req res 这些都是靠流来实现的,所以流是nodejs的核心机制
我们这里的fs是自己实现了流,下面描述下它的执行过程:
默认可读流会提供一个read方法

1
2
3
4
const {Readable} = require('stream');
class MyRead extends Readable{ // 自己写一个类来继承Readable

}

源码看下 Readable 是什么

1
两个类 第一个类 是 子类:ReadStream(_read),会把读取到的数据 调用push方法传进去   父类:Readable (read)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const {Readable,Writable} = require('stream');
class MyRead extends Readable{ //自己写的流
_read(){
this.push('1'); // 这个push调的是父类的push
this.push(null); // 放完了 表示暂停 这里会调line13的end方法
}
}
let mr = new MyRead();

mr.on('data',function(chunk){
console.log(chunk)
})
mr.on('end',function(chunk){
console.log('end')
})
// 可以看到这个流跟文件完全没有关系 自己写的

同样 WriteStream 有一个(write方法) 子类有(_write方法),子类会自己去调_write方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const {Readable,Writable} = require('stream');
class MyWrite extends Writable{
_write(chunk,encoding,clearBuffer){
console.log(chunk,'---');
clearBuffer()
}
}
let myWrite = new MyWrite();
myWrite.write('123');
myWrite.write('123');
myWrite.write('123');
myWrite.write('123');
/*执行结果:
<Buffer 31 32 33> '---'
<Buffer 31 32 33> '---'
<Buffer 31 32 33> '---'
<Buffer 31 32 33> '---'
*/