Nodejs fs (二)之 可读流 createReadStream

前言

文件流    文件的读取和操作
readFile 、writeFile 缺陷 :会淹没内存 所以可以变成=> read+write(open、read、close) 读一点写一点,但是这样又太复杂 fs自己封装了 fs.createReadStream

fs.createReadStream

概念

这里先创建一个文件1.txt

1
12345678909
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const fs = require('fs');
const path = require('path');

// 这里的参数在vscode编辑器中可以将鼠标放到createReadStream就会显示出来了
let rs = fs.createReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r', // 当前要做什么操作
encoding:null, // 默认是Buffer
highWaterMark: 64 * 1024,// 内部会创建 64k大的Buffer
//highWaterMark: 2,
// 读取 默认权限是4 写入默认是2 执行是1 ls可以查看文件权限 chmod改变权限
mode:0o666,//默认 文件读取 写入权限 执行权限 16进制 转化为十进制 438
autoClose:true, // true 读完之后会关闭
start:0, // 开始读取的位置
end:10 // 这样的话一次性读取11个 包前 又包后
})

// 默认流的模式是暂停模式
rs.on('data', function(data){ // 每次读取到的结果
console.log(data); // <Buffer 31 32 33 34 35 36 37 38 39 30 39>
//如果把line9打开
/*
<Buffer 31 32>
<Buffer 33 34>
<Buffer 35 36>
<Buffer 37 38>
<Buffer 39 30>
<Buffer 39>
*/
// 读取了6次 11/2
})
  • 可读流需要掌握的方法 ‘data’ ‘end’ Buffer.concat()

1.txt

1
你好
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const fs = require('fs');
const path = require('path');
let rs = fs.createReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:10
})
rs.on('open',function() {
console.log('文件打开触发open事件');
})
// 每次将传入的数据 接收保存起来
let arr = [];
rs.on('data', function(data){
console.log(data);
arr.push(data);
})
rs.on('end',function() {
console.log('文件读取完毕');
console.log(Buffer.concat(arr).toString());// 你好 这里用了Buffer中文转码的时候不会乱码
})
rs.on('close',function() {
console.log('文件关闭');
})
rs.on('error',function() {
console.log('出错了');
})
/*执行结果:
文件打开触发open事件
<Buffer e4 bd>
<Buffer a0 e5>
<Buffer a5 bd>
文件读取完毕
你好
文件关闭
*/

手写可读流原理

1】

新建:调用文件 stream.js     原理 ReadStream.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// stream.js
const fs = require('fs');
const path = require('path');
let ReadStream = require('./ReadStream');
let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:10
})
rs.on('open',function() {
console.log('文件打开触发open事件');
})
rs.on('data', function(data){
console.log(data);
})

ReadStream.js分析:rs这个实例是有on方法,并且这里上面line17 rs.on('data') 时能够读取文件,所以我需要下面line21能够监听到on('data') 这个事件, 所以这里继承 events ,因为它有 'newListener' 可以监听到 'data' ,同样的这个实例上也就有
on ('newListener') 方法 如line21,所以这里 ReadStream extends Events

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ReadStream.js
const Events = require('events');
const fs = require('fs');
class ReadStream extends Events{
constructor(path, options = {}){
super();
// 写类的时候构造函数中的属性都放到this上,这是一个规范,为了扩展方便,下面扩展方法的时候可以拿到
// 默认参数的赋值
this.path = path;
this.flags = options.flags || 'r';
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start | 0;
this.end = options.end;

this.flowing = null; // 默认是暂停模式
this.open(); // 打开文件 当创建可读流时,就打开文件(异步执行)
// 继承了events所以这里也有on newListener方法
this.on('newListener', function (type) {
if(type === 'data'){// 当用户监听data事件的时候 就开始读取文件
this.flowing = true;
console.log('开始读取');
this.read();
}
})
}
read(){
console.log('read~');
console.log(this.fd);
//这里要fs.read(fd)所以必须要拿到fd
}
open(){
fs.open(this.path, this.flags, (err, fd) =>{
this.fd = fd;
console.log('open~')
})
}
}
module.exports = ReadStream;
/*执行结果:
开始读取
read~
undefined
open~
*/

结果分析:从打印结果可以看出,虽然line19先执行的是open,但是line35 执行的是异步,而stream.js line17这里又绑了data,就会马上触发line21,只要stream.js里面的on绑定了事件(此刻是’data‘),就会立刻触发’newListener‘,这里面是同步的,也就执行了line29read()方法

2】

现在希望先open执行完再执行read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// stream.js
const fs = require('fs');
const path = require('path');
let ReadStream = require('./ReadStream');
let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:10
})
rs.on('open',function(fd) {
console.log('文件打开触发open事件',fd);
})
rs.on('data', function(data){
console.log("data:",data);
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// ReadStream.js
const Events = require('events');
const fs = require('fs');
class ReadStream extends Events{
constructor(path, options = {}){
super();
// 默认参数的赋值
this.path = path;
this.flags = options.flags || 'r';
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start | 0;
this.end = options.end;

this.flowing = null; // 默认是暂停模式
this.open(); // 打开文件

this.on('newListener', function (type) {
if(type === 'data'){
this.flowing = true;
console.log('开始读取');
this.read();
}
})
}
read(){
//console.log('read~');
if(typeof this.fd !== 'number'){// 因为read比open先执行
return this.once('open', this.read); //所以先把read方法存起来,等open后再次调用
// this.once('open', this.read); // 这2行等同于上一行写法
// return ;
}
console.log('read~~',this.fd); //这样read里面就能拿到fd
}
open(){
fs.open(this.path, this.flags, (err, fd) =>{
this.fd = fd;
console.log('open~')
this.emit('open',this.fd);
})
}
}
/*执行结果:
开始读取
open~
文件打开触发open事件 3
read~~ 3
*/
module.exports = ReadStream;

分析:

  • 顺序:stream.js line5一new这个类,立马line18执行open事件,因为line38是异步执行,所以在line41emit的时候,stream.js的line14 open已经绑定好了,所以能够触发 并且把fd也能传过去
  • 这里用发布订阅的思想来解耦
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
     // 之前的相当于如下:
    fs.open(path,..,function(){ // 成功之后再read
    fs.read(...console.)
    })
    // 现在就相当于
    fs.open(...,function(){
    rs.emit('ok');
    })
    this.on('ok',function() {
    rs.read(...);
    })
    // 让两个相关联的异步解耦并且是有关联的

3】

读取到内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// stream.js
const fs = require('fs');
const path = require('path');
let ReadStream = require('./ReadStream');
let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:10
})
rs.on('open',function(fd) {
console.log('文件打开触发open事件',fd);
})
rs.on('data', function(data){
console.log('Buffer:',data);
})
rs.on('end',function() {
console.log('文件读取完毕');
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// ReadStream.js
const Events = require('events');
const fs = require('fs');
class ReadStream extends Events{
constructor(path, options = {}){
super();

// 默认参数的赋值
this.path = path;
this.flags = options.flags || 'r';
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start | 0;
this.end = options.end;

this.flowing = null;
this.offset = 0; // 偏移量
this.open();

this.on('newListener', function (type) {
if(type === 'data'){
this.flowing = true;
console.log('开始读取');
this.read();
}
})
}
read(){
if(typeof this.fd !== 'number'){
return this.once('open', this.read);
}
//读文件
let buffer = Buffer.alloc(this.highWaterMark);
// 每次读取2个把buffer填满
fs.read(this.fd,buffer,0,this.highWaterMark,this.offset,(err,bytesRead)=>{
this.offset += bytesRead;
if(bytesRead > 0){// 如果读取到内容就再次尝试读取
// 将数据传输给stream.js line17 data中
this.emit('data', buffer);
this.read();
}else{
this.emit('end');
}
})
}
open(){
fs.open(this.path, this.flags, (err, fd) =>{
console.log('open~')
this.fd = fd;
this.emit('open',this.fd);
})
}
}
module.exports = ReadStream;
/*执行结果:
开始读取
open~
文件打开触发open事件 3
Buffer: <Buffer e4 bd>
Buffer: <Buffer a0 e5>
Buffer: <Buffer a5 bd>
文件读取完毕
*/

问题: 现在可以看到 1.txt里面的 你好 占6个字节 现在将左侧 stream.js的line12 end改成4,这样从0-4就是读取5个 运行可以看到结果还是读取了6个 因为line37读取的是this.highWaterMark;如果使用原生的则是读取5个

4】

howMuchToRead 、close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// stream.js
const fs = require('fs');
const path = require('path');
let ReadStream = require('./ReadStream');
let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:4
})
rs.on('open',function(fd) {
console.log('文件打开触发open事件',fd);
})
rs.on('data', function(data){
console.log('buffer:',data);
})
rs.on('end',function() {
console.log('文件读取完毕');
})
rs.on('close', function(){
console.log('文件关闭');
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// ReadStream.js
const Events = require('events');
const fs = require('fs');
class ReadStream extends Events{
constructor(path, options = {}){
super();

// 默认参数的赋值
this.path = path;
this.flags = options.flags || 'r';
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start | 0;
this.end = options.end;

this.flowing = null;
this.offset = 0; // 偏移量
this.open();

this.on('newListener', function (type) {
if(type === 'data'){
this.flowing = true;
console.log('开始读取');
this.read();
}
})
}
read(){
if(typeof this.fd !== 'number'){
return this.once('open', this.read);
}
// 每次读取2个把buffer填满
// 一共读取几个 this.end - this.start + 1 4-0+1 = 5个
// 现在是 2 2 2
// 应该是 2 2 1
let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.start + 1 - this.offset) : this.highWaterMark;

let buffer = Buffer.alloc(howMuchToRead);

fs.read(this.fd,buffer,0,howMuchToRead,this.offset,(err,bytesRead)=>{
this.offset += bytesRead;
if(bytesRead > 0){
this.emit('data', buffer);
this.read();
}else{
this.emit('end');
this.close();
}
})
}
close(){
if(this.autoClose){
fs.close(this.fd,()=> {
this.emit('close');
})
}
}
open(){
fs.open(this.path, this.flags, (err, fd) =>{
console.log('open~');
this.fd = fd;
this.emit('open',this.fd);
})
}
}
module.exports = ReadStream;
/*执行结果:
开始读取
open~
文件打开触发open事件 3
buffer: <Buffer e4 bd>
buffer: <Buffer a0 e5>
buffer: <Buffer a5>
文件读取完毕
文件关闭
*/

5】

pause() 、resume()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 这里引用原生 fs.createReadStream
const fs = require('fs');
const path = require('path');
let ReadStream = require('./ReadStream');
let rs = fs.createReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:4
})
rs.on('open',function(fd) {
console.log('文件打开触发open事件',fd);
})
rs.on('data', function(data){
console.log('buffer:',data);// 只读取了2个 buffer: <Buffer e4 bd>
rs.pause(); // 暂停
})
setInterval(() => {
rs.resume(); // 每隔2秒开启读取
}, 2000);
rs.on('close', function(){
console.log('文件关闭');
})
/*执行结果
文件打开触发open事件 3
buffer: <Buffer e4 bd>
buffer: <Buffer a0 e5> (隔2秒打印)
buffer: <Buffer a5> (隔2秒打印)
文件关闭
*/

引用改成自己手写的ReadStream
this.flowing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// stream.js
const fs = require('fs');
const path = require('path');
let ReadStream = require('./ReadStream');
let rs = new ReadStream(path.resolve(__dirname, '1.txt'),{
flags:'r',
encoding:null,
highWaterMark: 2,
mode:0o666,
autoClose:true,
start:0,
end:4
})
rs.on('open',function(fd) {
console.log('文件打开触发open事件',fd);
})
rs.on('data', function(data){
console.log('buffer:',data);
rs.pause(); // 暂停
})
setInterval(() => {
rs.resume(); // 每隔2秒开启读取
}, 2000);
rs.on('close', function(){
console.log('文件关闭');
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// ReadStream.js
const Events = require('events');
const fs = require('fs');
class ReadStream extends Events{
constructor(path, options = {}){
super();

// 默认参数的赋值
this.path = path;
this.flags = options.flags || 'r';
this.encoding = options.encoding || null;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start | 0;
this.end = options.end;

this.flowing = null; // *******************************flowing
this.offset = 0; // 偏移量
this.temp = true; // resume方法里是否执行read()的标识 ********************temp
this.open();

this.on('newListener', function (type) {
if(type === 'data'){
this.flowing = true; // *******************************flowing
console.log('开始读取');
this.read();
}
})
}
read(){
if(typeof this.fd !== 'number'){
return this.once('open', this.read);
}

let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.start + 1 - this.offset) : this.highWaterMark;

let buffer = Buffer.alloc(howMuchToRead);

fs.read(this.fd,buffer,0,howMuchToRead,this.offset,(err,bytesRead)=>{
this.offset += bytesRead;
if(bytesRead > 0){
this.emit('data', buffer);
this.flowing && this.read(); // flowing为true时 *******************************flowing
}else{
this.temp = false; // // *******************************temp
this.emit('end');
this.close();
}
})
}
close(){
if(this.autoClose){
fs.close(this.fd,()=> {
this.emit('close');
})
}
}
open(){
fs.open(this.path, this.flags, (err, fd) =>{
console.log('open~');
this.fd = fd;
this.emit('open',this.fd);
})
}
pause(){
this.flowing = false; // *******************************flowing
}
resume(){
this.flowing = true; // *******************************flowing
this.temp && this.read(); // *******************************temp
}
}
module.exports = ReadStream;
/*执行结果:
开始读取
open~
文件打开触发open事件 3
buffer: <Buffer e4 bd>
buffer: <Buffer a0 e5> (隔2秒打印)
buffer: <Buffer a5> (隔2秒打印)
文件关闭 (隔2秒打印)
*/

可读流

  • 如何将多个 有关系的异步代码进行拆分 read() open() 发布订阅

可写流

  • 如何处理并发操作文件 多个异步 造队列 => 链表