TCP 数据粘包解决方案之数据封包与拆包
分类:
node
日期:
2022-6-12标签:
网络通信TCP粘包
TCP 是面向字节流的协议,就是没有界限的一串数据,本没有“包”的概念,“粘包”和“拆包”一说是为了有助于形象地理解这两种现象。
TCP 粘包、拆包图解#
因为 TCP 是面向流,没有边界,而操作系统在发送 TCP 数据时,会通过缓冲区来进行优化,例如缓冲区为 1024 个字节大小。 如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。 如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包。
下图演示了粘包、拆包的过程,client 分别发送了两个数据包 D1 和 D2 给 server,server 端一次读取到字节数是不确定的,因此可能可能存在以下几种情况:
关于这几种情况说明如下:
- 正常的理想情况,两个包恰好满足 TCP 缓冲区的大小或达到 TCP 等待时长,分别发送两个包;
- 粘包:两个包较小,间隔时间短,发生粘包,合并成一个包发送;
- 拆包:一个包过大,超过缓存区大小,拆分成两个或多个包发送;
- 拆包和粘包:Packet1 过大,进行了拆包处理,而拆出去的一部分又与 Packet2 进行粘包处理。
由于发送方发送的数据,可能会发生粘包、拆包的情况。这样,对于接收端就难于分辨出来了,因此必须提供科学的机制来解决粘包、拆包问题,这就是协议的作用。
常见的解决方案#
对于粘包和拆包问题,常见的解决方案有四种:
- 发送端将每个包都封装成固定的长度,比如 100 字节大小。如果不足 100 字节可通过补 0 或空等进行填充到指定长度;
- 发送端在每个包的末尾使用固定的分隔符,例如\r\n。如果发生拆包需等待多个包发送过来之后再找到其中的\r\n 进行合并;例如,FTP 协议;
- 将消息分为头部和消息体,头部中保存整个消息的长度,只有读取到足够长度的消息之后才算是读到了一个完整的消息;
- 通过自定义协议进行粘包和拆包的处理。
数据封包与拆包的实现及应用举例#
这里主要介绍第三种解决方案的实现思路:
数据包转换模块的封装#
每次要发送的数据封装成一个包含头部和消息体的数据包,头部的总长度是 4 个字节,消息头包含两个部分:序列号(serialNum),标记消息体长度的(serialLen)各占 2 个字节。
class MyTransformCode {constructor() {// 初始化头部长度是4this.packageHeaderLen = 4;// 初始化序列号为 0this.serialNum = 0;// 初始化头部消息体长度标记为是2this.serialLen = 2;}//转码encode(data, serialNum) {// 将消息数据转为二进制消息体const body = Buffer.from(data);// 01 先按照指定的长度来申请一片内存空间做为 header 来使用const headerBuf = Buffer.alloc(this.packageHeaderLen);// 02 将序列号(serialNum)和消息体长度(serialLen),分别写入头部对应位置// writeInt16BE第一个参数表示写入的value,第二个可选参数表示offset(偏移量)headerBuf.writeInt16BE(serialNum || this.serialNum);// 头部前面写入了序列号,偏移2个长度在写入消息体长度headerBuf.writeInt16BE(body.length, this.serialLen);// 判断有没有传形参 serialNum,如果没传,则用内部的序列号,并且每次转码后自增1if (serialNum == undefined) {this.serialNum++;}// 将头和消息体拼接成一个完整的数据包返回return Buffer.concat([headerBuf, body]);}// 解码,形参buffer 是经过encode转码的decode(buffer) {// 切割出数据包头部const headerBuf = buffer.slice(0, this.packageHeaderLen);// 切割出数据包消息体const bodyBuf = buffer.slice(this.packageHeaderLen);return {// 读取头部序列号serialNum: headerBuf.readInt16BE(),// 读取头部标记的消息体长度, readInt16BE的第一个入参代表读取的便宜位bodyLength: headerBuf.readInt16BE(this.serialLen),// 读取消息体数据,转字符串body: bodyBuf.toString(),};}// 获取数据包长度的方法getPackageLen(buffer) {if (buffer.length < this.packageHeaderLen) {// 如果数据包长度小于头部长度,表示其是一个不完整的数据包,返回0return 0;} else {// 否则返回 头部的长度+消息体长度return this.packageHeaderLen + buffer.readInt16BE(this.serialLen);}}}module.exports = MyTransformCode;
服务端应用示例#
在服务端中使用,通过 node 中的 net 模块启动一个服务,调用 MyTransformCode 进行数据的封包与拆包
const net = require('net');const MyTransform = require('./myTransform.js');const server = net.createServer();// 定义一个缓存区,用于存储可能未接收完的数据,这在实际的传输过程中是有可能发生的let overageBuffer = null;let ts = new MyTransform();server.listen('1234', 'localhost');server.on('listening', () => {console.log('服务端运行在 localhost:1234');});server.on('connection', socket => {// 监听数据接收socket.on('data', chunk => {if (overageBuffer) {// 每一次接收,判断缓冲区中是否有上次未读取处理的数据,有就和本次chunk拼接chunk = Buffer.concat([overageBuffer, chunk]);}// 定义一个变量,代表数据包的长度,初始化为0let packageLen = 0;// 获取数据包长度,判断是否包含一个完整数据包while ((packageLen = ts.getPackageLen(chunk))) {// 从chunk切割出一个数据包const packageCon = chunk.slice(0, packageLen);// 切割一个数据包后,剩余的数据chunk = chunk.slice(packageLen);// 对数据包进行解码const ret = ts.decode(packageCon);console.log(ret);// 示例,将每个数据包中解码得到的消息体和数据包的序列号,转码后返回给客户端socket.write(ts.encode(ret.body, ret.serialNum));}// 将剩余的chunk,保存在缓冲区中overageBuffer = chunk;});});
客户端应用示例#
客户端同理,调用 MyTransformCode 进行数据的封包与拆包
const net = require('net');const MyTransform = require('./myTransform.js');let overageBuffer = null;let ts = new MyTransform();const client = net.createConnection({host: 'localhost',port: 1234,});// 下面发送的数据,会存在数据粘包现象,即将所有数据合并再一起发送// client.write('hello tom and jery 1')// client.write('hello tom and jery 2')// client.write('hello tom and jery 3')// client.write('hello tom and jery 4')// client.write('hello tom and jery 5')// 将数据转码后再发送,这样即使合并发送,服务端也可以通过解码得到每次发送的数据client.write(ts.encode('hello tom and jery 1'));client.write(ts.encode('hello tom and jery 2'));client.write(ts.encode('hello tom and jery 3'));client.write(ts.encode('hello tom and jery 4'));client.write(ts.encode('hello tom and jery 5'));client.on('data', chunk => {if (overageBuffer) {chunk = Buffer.concat([overageBuffer, chunk]);}let packageLen = 0;// 接收服务端返回的数据,是否能从中获取一个数据包while ((packageLen = ts.getPackageLen(chunk))) {const packageCon = chunk.slice(0, packageLen);chunk = chunk.slice(packageLen);// 对服务端返回的数据解码const ret = ts.decode(packageCon);console.log(ret);}// 不完整数据进行保存overageBuffer = chunk;});