Nodejs fs (三)之 可写流 createWriteStream

前言

可读流中有 on(‘data’) on(‘end’)
可写流有 write end

fs.createWriteStream

概念

这里先创建一个文件 notd.md

1
你好
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 可读流
const fs = require('fs');
const path = require('path');
let rs = fs.createReadStream(path.resolve(__dirname,'note.md'),{
highWaterMark: 1 // 代表每次读1个
});

// 可写流
let ws = fs.createWriteStream(path.resolve(__dirname,'copy.md'));
// 默认是暂停模式
let arr = [];
rs.on('data', function(data) {
arr.push(data)
// 去做写的操作
ws.write(data);
})
rs.on('end', function() {
console.log(Buffer.concat(arr).toString()); //你好
ws.end();
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const fs = require('fs');
const path = require('path');

let rs = fs.createReadStream(path.resolve(__dirname,'note1.md'),{
highWaterMark: 1 // 默认64 * 1024 每次读一个 读完一个往里写
});

// 可写流
let ws = fs.createWriteStream(path.resolve(__dirname,'copy1.md'),{
highWaterMark: 3, // 默认 16 * 1024 预计占用的内存空间是多少

});
// 默认是暂停模式
let arr = [];

rs.on('data', function(data) {
arr.push(data)
// 去做写的操作
let flag = ws.write(data); // 做到收支平衡,读一点写一点
// flag代表的是当前预计的内存大小是否被撑满
console.log(flag);
// write调的就是fs.write是异步的,相当于多个异步请求操作同一个文件,不能多个异步同时操作一个文件,所以会放到缓存区中
})
rs.on('end', function() {
console.log(Buffer.concat(arr).toString());
ws.end();
})

具体要看flag可以看下面的 只看可写流

示例

1. 预计写10个数 同步的写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.resolve(__dirname, 'note3.md'), {
highWaterMark: 3,
});
let index = -1;
function write(){
while(++index<10){
let flag = ws.write(index+''); // 可写流写入的数据只能是字符串或者Buffer 所以这里+''
console.log(flag);
}
}
write();
/*执行结果
true
true
false
false
false
false
false
false
false
false
*/
// 文件 note3.md 0123456789

可写流写入的数据只能是字符串或者Buffer 所以这里+’’

原理如图:

第一次0往里写还能写下,第二次1往里写还能写下,第三次2往里写,2能写下,但是之后的不能写下了,所以如图flag为false,即flag表示是否还能继续往里写,值为false时表示预计的缓存区放不下, 但是还是会放进去,会撑开,这样就浪费内存了,希望的是写不下了,就等待,等写完了再放进去。

2. 加flag的判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.resolve(__dirname, 'note3.md'), {
highWaterMark: 3,
});
let index = -1;
function write(){
let flag = true;
while(++index<10 && flag){
flag = ws.write(index+'');
console.log(flag);
}
}
write();
// 文件 note3.md 012

3. 当内存中的值都写入文件之后继续写入

on(‘drain’)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 当内存中的值都写入文件之后继续写入
let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.resolve(__dirname, 'note3.md'), {
highWaterMark: 3,
});
let index = 0;
function write(){
let flag = true;
while(index<10 && flag){
flag = ws.write(index+'');
index ++;
console.log(flag);
}
}
write();
// 只有当写入的个数达到了预计的大小 并且被写入到文件后清空了 才会触发drain,所以这里最后只有一个9传进去之后没有再触发(即执行结果里面最后一个true之后没有再触发‘干了’)
ws.on('drain', function() {
console.log('干了');
write();
})
/*执行结果:
true
true
false
干了
true
true
false
干了
true
true
false
干了
true
*/

4. 只有手动调end 才会触发关闭文件的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.resolve(__dirname, 'note3.md'), {
highWaterMark: 3,
});
let index = 0;
function write(){
let flag = true;
while(index<10 && flag){
flag = ws.write(index+'');
index ++;
console.log(flag);
}
if(index === 10){
ws.end(); // 关闭文件 关闭可写流 // 0123456789
//ws.end('hello'); // 如果end里加了参数,最后会写入文件中 // 0123456789hello
}
}
write();
ws.on('drain', function() {
console.log('干了');
write();
})
ws.on('close',function(){
console.log('close');
})

手写可写流

新建:调用文件 write.js      原理 ReadStream.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// write.js
let fs = require('fs');
let path = require('path');
const WriteStream = require('./WriteStream');
let ws = new WriteStream(path.resolve(__dirname, 'note3.md'), {
highWaterMark: 3, // 表示预期占用几个内存
encoding: 'utf8', // 写入的编码
start: 0, // 从文件的第0个位置写入
mode: 438,
flags: 'w', // 默认操作是可写
});
let index = 0;
function write(){
let flag = true;
while(index<10 && flag){
flag = ws.write(index+'');
index ++;
console.log(flag);
}
// if(index === 10){
// ws.end('hello');
// }
}
write();
ws.on('drain', function() {
console.log('干了');
write();
})
ws.on('close',function(){
console.log('close');
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// WriteStream.js
const EventEmitter = require('events');
const fs = require('fs');
const path = require('path');
class WriteStream extends EventEmitter {
constructor(path, options = {}){
super();
this.path = path;
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.encoding = options.encoding || 'utf8';
this.start = options.start || 0;
this.mode = options.mode || 0o666;
this.flags = options.flags || 'w';

// 先打开文件
this.open();
// 缓存区
this.cache = []; // 写的时候只有第一次往文件里写,其他的都放缓存区
this.writing = false; // 判断是否正在被写入
this.len = 0; // 缓存区的大小
this.needDrain = false; // 是否触发drain事件
this.offset = this.start; // 表示每次写入文件的偏移量
}
open(){ // 先打开文件
fs.open(this.path, this.flags, (err, fd)=>{
this.fd = fd; // 文件标识符
this.emit('open', this.write);
})
}
write(chunk, encoding=this.encoding,callback){ // 用户会同步的调用write方法
console.log('write~~',this.fd); // 打印这里可以看到 write比open先调用,write这里是同步操作,open是异步 line27当open打开文件之后再触发write
// 此时这里面用户调用write方法时 需要判断当前是否正在写入,如果正在写入放到缓存中,如果不是正在写入需要把它真正的向文件写入

// 因为len取的长度是字节长度,所以这里面数据类型要统一,判断是否为Buffer
// 1) 判断这个chunk 是不是Buffer 如果不是Buffer转化成Buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
this.len += chunk.length; // 统计写入数据的个数
let flag = this.len < this.highWaterMark;
this.needDrain = !flag; //当前写入的内容>=highWaterMark才会触发
if(this.writing){ // 当前是否正在写入 是 往内存中放
this.cache.push({ // 将除了第一次真实的向内存中写入的 其他都放入内存中
chunk,
encoding,
callback
})
}else{// 真实向文件中写入
this.writing = true; // 标识正在写入
this._write(chunk,encoding,()=>{
callback && callback(); // 先执行自己的成功操作
this.clearBuffer(); // 再清空队列中的第一个
});
}

return flag;
}
clearBuffer(){ //
// console.log('清空缓存区队列中的数据');
let obj = this.cache.shift();// 取第一个值
if(obj){// 说明需要写入
this._write(obj.chunk,obj.encoding,()=>{
obj.callback && obj.callback();
this.clearBuffer();
})
}else{ // 说明缓存区中没有 需要写入
if(this.needDrain){
this.needDrain = false; // 表示下一次需要重新判断是否需要触发drain事件
this.writing = false; // 表示下一次调用write要向文件中写入(因为缓存区已经obj没有数据了)))
this.emit('drain');
}
}
}
// 核心的写入方法
_write(chunk,encoding,clearBuffer){
if(typeof this.fd !== 'number'){ //write()相对于open是先调用的
return this.once('open',()=>{this._write(chunk,encoding,clearBuffer)});
}
// todo 写入操作
// flags 是读取还是写入
// chunk 写入的数据
// 0表示把数据的第0个位置开始写入
// 读取buffer多少个字节
// 每次写入文件的偏移量
fs.write(this.fd,chunk,0,chunk.length,this.offset, (err, written)=>{ // written表示真实写入的格式
this.offset += written; // 增加偏移量
this.len -= written; // 减少缓存区的数据
clearBuffer(); // 清空缓存
})
}
close(){
fs.close(this.fd, ()=>{
this.emit('close');
})
}
end(chunk,encoding){ // 如果传递参数 就需要写入
if(chunk){ // 写入
return this._write(chunk,encoding,()=>{
this.close();
})
}
return this.close();
}
}
module.exports = WriteStream;