LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

详解 WebSocket 实现

freeflydom
2023年5月30日 10:7 本文热度 508

前言

为什么写这篇文章?

对于应用协议的了解,相信大部分👨‍🎓始终停留在使用上,可能读了如 《图解HTTP》、《计算机网络》 一类的书籍,了解了更深层次的理论,但理论始终是理论,我们很难有机会能在工作场景里面去触碰到协议的实现上,对应用层协议的理解是片面的。

WebSocket 的实现非常适合前端的同学学习,通过了解 WebSocket Node版实现,站在更高的维度上去看待这些应用层协议,无论是WebRTC,还是HTTP都好,有了对一种协议实现的整体思路,就有了面对各种协议的技术自信。

WebSocket简介

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工(full-duplex)通讯的协议。没有了 Request 和 Response 的概念,两者地位完全平等,连接一旦建立,就建立了真•持久性连接,双方可以随时向对方发送数据。

注:全双工(Full Duplex)是一种通信方式,指通信的双方可以同时发送和接收数据,而且在同一时刻,发送和 接收是独立进行的。

概念老生常谈了,不妨思考两个问题:

WebSocket 为什么能进行全双工通信?HTTP 却不行?

HTTP是非持久化连接,每次客户端向服务器发送请求时,都需要建立一个新的 TCP 连接。这个连接在响应结束后就会被关闭,不保留在系统中,而 websocket 会保留,所以可以继续保持通信。

HTTP 为什么是非持久化连接而 WebSocket 是持久化连接?

HTTP 协议的设计初衷是为了传输静态文本信息,如 HTML、CSS、JS 等等,传输静态文本大部分时间连接并不会被频繁地打开和关闭,所以使用持久化连接带来的复杂度可能会超过它的收益。而不像 websocket 更多是为了实时通信的场景,所以采用持久化连接。

WebSocket握手过程

我们经常听到一种说法,WebSocket 基于 HTTP,实际上只有在建立握手时,数据是通过 HTTP 传输的。但是建立之后,在真正传输时候是不需要 HTTP协议。握手具体过程如图所示:

注:websocket 会保留 HTTP 握手后的 socket 连接,后续即可利用这个 socket 进行通讯,无需再关注 HTTP。

websocket 为什么采用 HTTP 握手?

WebSocket 是相对较新的协议,可能并不是所有的网络设备和服务器都支持。因此,在浏览器请求服务器进行 WebSocket 握手时,使用基于 HTTP 协议的握手方式可以避免协议兼容性问题。

WebSocket实现原理

WebSocket数据帧说明

WebSocket 以帧的形式进行数据传输,帧组成包括以下几个部分:

  1. 头部(Header):包括了一些控制信息,如 FIN、RSV1、RSV2、RSV3、Opcode、MASK、Payload Length 以及 Masking Key 等字段,用于描述该帧的类型、长度以及是否经过掩码操作等信息。它有 2~14 个字节不等,其中前 2 个字节是必需的。

  2. 掩码(Masking Key):用于对载荷(Payload)进行加密解密操作,它的长度固定为 4 个字节,如果 MASK 标志被设置为 1,则该字段必须存在。

  3. 载荷(Payload):包含了应用层发送的数据,具体内容由 Opcode 字段指定的数据类型决定。如果有 MASK 标志,则需要对其进行解码。

各字段含义如表格所示:

字段含义长度
FIN是否为最后一帧1 bit
RSV预留位,方便后续拓展协议3 bit
opcode解释 payload data 的用途4 bit
MASK定义“payload data”是否被添加掩码1 bit
payload data length数据长度7 bit 7+16 bit 7+64 bit
Serial Number序列号16bit
Masking-key掩码32bit
payload data传输数据payload data length

把数据帧组成搞清楚,可以说 websocket 你就了解了一大半,协议实现里大部分操作都是对于数据帧的处理。

构造帧

websocket 的数据是以帧的形式传输,那么我们就需要了解如何构造帧。构造帧只是听起来很复杂,构造一个数据帧我们只需要遵守协议规则填写即可,按照WebSocket的协议标准,构造一个最短数据帧我们只需要三个字节就能完成,构成如表格所示下:

字节编号填入内容
1FIN、RSV、opcode
2MASK、payload data length
3payload data

代码实现:

注:本质就是做一些字节拼接操作,把对应的标识放到对应的位置即可。

数据传输

了解往构造帧的过程,那 websocket 是如何把帧发送出去的?

WebSocket 在握手过程中会保留 HTTP 握手后的 socket 连接,这在前面有提到,所以我们可以通过这个 socket 连接进行数据的传输。

代码实现如下:

心跳机制

在连接过中,防止连接因长时间无数据传输而被提前关闭,WebSocket 还引入了心跳机制,原理可以概括为:定期发送心跳包,以确认客户端与服务器的连接状态,并避免连接因长时间空闲而被中断。 代码实现如下:

mini-ws

以下为一个 websocket server 的简易实现(代码来源):


测试代码:

var crypto = require("crypto");

var { EventEmitter } = require("events");

var MAX_FRAME_SIZE = 1024; // 最长长度限制

var MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";


/**

 *  数据类型操作码 TEXT 字符串

 *  BINARY 二进制数据 常用来保存照片

 *  PING,PONG 用作心跳检测

 *  CLOSE 关闭连接的数据帧 (有很多关闭连接的代码 1001,1009,1007,1002)

 */

var OPCODES = {

    CONTINUE: 0,

    TEXT: 1,

    BINARY: 2,

    CLOSE: 8,

    PING: 9,

    PONG: 10,

};


var hashWebSocketKey = function (key) {

    var sha1 = crypto.createHash("sha1");

    sha1.update(key + MAGIC_STRING, "ascii");

    return sha1.digest("base64");

};

/**

 * 解掩码

 * @param maskBytes 掩码数据

 * @param data payload

 * @returns {Buffer}

 */

var unmask = function (maskBytes, data) {

    var payload = Buffer.alloc(data.length);

    for (var i = 0; i < data.length; i++) {

        payload[i] = maskBytes[i % 4] ^ data[i];

    }

    return payload;

};


/**

 * 编码数据

 * @param opcode 操作码

 * @param payload   数据

 * @returns {*}

 */

var encodeMessage = function (opcode, payload, isFinal = true) {

    var buf;

    var b1 = (isFinal ? 0x80 : 0x00) | opcode;

    var b2;

    var length = payload.length;

    if (length < 126) {

        buf = Buffer.alloc(payload.length + 2 + 0);

        b2 |= length;

        //buffer ,offset

        buf.writeUInt8(b1, 0); //读前8bit

        buf.writeUInt8(b2, 1); //读8―15bit

        payload.copy(buf, 2); //复制数据,从2(第三)字节开始

    } else if (length < 1 << 16) {

        buf = Buffer.alloc(payload.length + 2 + 2);

        b2 |= 126;

        buf.writeUInt8(b1, 0);

        buf.writeUInt8(b2, 1);

        buf.writeUInt16BE(length, 2);

        payload.copy(buf, 4);

    } else {

        buf = Buffer.alloc(payload.length + 2 + 8);

        b2 |= 127;

        buf.writeUInt8(b1, 0);

        buf.writeUInt8(b2, 1);

        buf.writeUInt32BE(0, 2);

        buf.writeUInt32BE(length, 6);

        payload.copy(buf, 10);

    }


    return buf;

};


class WebSocket extends EventEmitter {

    constructor(req, socket, upgradeHead) {

        super();

        var resKey = hashWebSocketKey(req.headers["sec-websocket-key"]);


        // 构造响应头

        var resHeaders = [

            "HTTP/1.1 101 Switching Protocols",

            "Upgrade: websocket",

            "Connection: Upgrade",

            "Sec-WebSocket-Accept: " + resKey,

        ]

            .concat("", "")

            .join("\r\n");


        socket.on("data", (data) => {

            this.buffer = Buffer.concat([this.buffer, data]);

            while (this._processBuffer()) {}

        });


        socket.on("close", (had_error) => {

            if (!this.closed) {

                this.emit("close", 1006);

                this.closed = true;

            }

        });


        socket.write(resHeaders);


        this.socket = socket;

        this.buffer = Buffer.alloc(0);

        this.closed = false;

        this.frames = Buffer.alloc(0);

        this.frameOpcode = 0;

        this.keepLiveTimer = null;

    }

    /*

 发送数据函数

 * */

    send(obj) {

        var opcode;

        var payload;

        // 如果是二进制

        if (Buffer.isBuffer(obj)) {

            opcode = OPCODES.BINARY;

            payload = obj;

        } else if (typeof obj) {

            // 承载的文本内容

            opcode = OPCODES.TEXT;

            //创造一个utf8的编码,可以被编码为字符串

            payload = Buffer.from(obj, "utf8");

        } else {

            throw new Error("cannot send object.Must be string of Buffer");

        }


        this._doSend(opcode, payload);

    }


    // 默认 45 秒 保持发送心跳

    keepLive(timeout = 45000) {

        var self = this;

        function keepit() {

            self._doSend(OPCODES.PING, Buffer.from("ping"));

            console.log("server send ping...");

            // 在关闭连接的情况下就不再需要发送 ping 请求了

            if (!self.closed) {

                self.keepLiveTimer = setTimeout(keepit, timeout);

            }

        }

        keepit();

    }


    /*

 关闭连接函数

 * */

    close(code, reason) {

        var opcode = OPCODES.CLOSE;

        var buffer;

        if (code) {

            buffer = Buffer.alloc(Buffer.byteLength(reason) + 2);

            buffer.writeUInt16BE(code, 0);

            buffer.write(reason, 2);

        } else {

            buffer = Buffer.alloc(0);

        }

        this._doSend(opcode, buffer);

        this.closed = true;

    }


    _processBuffer() {

        var buf = this.buffer;

        if (buf.length < 2) {

            return;

        }

        var idx = 2;

        var byte1 = buf.readUInt8(0); // 读取数据帧的前 8 bit

        var FIN = byte1 & 0x80; // 如果为0x80,则标志传输结束,获取高位 bit

        var opcode = byte1 & 0x0f; //截取第一个字节的后 4 位,即 opcode 码


        // 如果是 0 的话,说明是延续帧,需要保存好 opCode

        if (!FIN) {

            this.frameOpcode = opcode || this.frameOpcode; // 确保不为 0;

        }


        var byte2 = buf.readUInt8(1); // 读取数据帧第二个字节

        var MASK = byte2 & 0x80; // 判断是否有掩码,客户端必须要有,获取高位 bit

        var length = byte2 & 0x7f; //获取length属性,也是小于126数据长度的数据真实值

        if (length > 125) {

            if (buf.length < 8) {

                return; // 如果大于125,而字节数小于 8,则显然不合规范要求

            }

        }

        if (length === 126) {

            //获取的值为126 ,表示后两个字节(16位)用于表示数据长度

            length = buf.readUInt16BE(2); // 读取 16bit 的值

            idx += 2; // +2

        } else if (length === 127) {

            //获取的值为 127 ,表示后 8 个字节(64位)用于表示数据长度,其中高 4 字节是 0

            var highBits = buf.readUInt32BE(2); //(1/0)1111111,切记 MSB 最高位是 0

            if (highBits != 0) {

                this.close(1009, ""); //1009 关闭代码,说明数据太大; 协议里是支持 63 位长度,不过这里我们自己实现的话,只支持 32 位长度,防止数据过大;

            }

            length = buf.readUInt32BE(6); // 从第 6 到第 10 个字节(32位)为真实存放的数据长度

            idx += 8;

        }

        if (buf.length < idx + 4 + length) {

            //不够长 4为掩码字节数

            return;

        }

        // 如果有 mask 标志位,默认都是有的

        if (MASK) {

            var maskBytes = buf.slice(idx, idx + 4); //获取掩码数据

            idx += 4; //指针前移到真实数据段

            var payload = buf.slice(idx, idx + length); // 数据长度的单位是字节

            payload = unmask(maskBytes, payload); //解码真实数据

        } else {

            payload = buf.slice(idx, idx + length);

        }


        this.buffer = buf.slice(idx + length); // 缓存 buffer

        // 有可能是分帧,需要拼接数据

        this.frames = Buffer.concat([this.frames, payload]); // 保存到 frames 中


        if (!FIN) {

            console.log(

                "server detect fragment, sizeof payload:",

                Buffer.byteLength(payload)

            );

        }


        if (FIN) {

            payload = this.frames.slice(0); // 获取所有拼接完整的数据

            opcode = opcode || this.frameOpcode; // 如果是 0 ,则保持获取之前保存的 code

            this.frames = Buffer.alloc(0); // 清空 frames

            this.frameOpcode = 0; // 清空 opcode

            this._handleFrame(opcode, payload); // 处理操作码

        }


        return true; // 继续处理

    }

    /**

     * 针对不同操作码进行不同处理

     * @param 操作码

     * @param 数据

     */

    _handleFrame(opcode, buffer) {

        var payload;

        switch (opcode) {

            case OPCODES.TEXT:

                payload = buffer.toString("utf8"); //如果是文本需要转化为utf8的编码

                this.emit("data", opcode, payload); //Buffer.toString()默认utf8 这里是故意指示的

                break;

            case OPCODES.BINARY: //二进制文件直接交付

                payload = buffer;

                this.emit("data", opcode, payload);

                break;

            case OPCODES.PING: // 发送 pong 做响应

                this._doSend(OPCODES.PONG, buffer);

                break;

            case OPCODES.PONG: //不做处理

                console.log("server receive pong");

                break;

            case OPCODES.CLOSE: // close有很多关闭码

                let code, reason; // 用于获取关闭码和关闭原因

                if (buffer.length >= 2) {

                    code = buffer.readUInt16BE(0);

                    reason = buffer.toString("utf8", 2);

                }

                this.close(code, reason);

                this.emit("close", code, reason);

                break;

            default:

                this.close(1002, "unhandle opcode:" + opcode);

        }

    }

    // 这里可以针对 payload 的长度做分片

    _doSend(opcode, payload) {

        var len = Buffer.byteLength(payload);


        // 分片的距离逻辑

        var count = 0;

        while (len > MAX_FRAME_SIZE) {

            var framePayload = payload.slice(0, MAX_FRAME_SIZE);

            payload = payload.slice(MAX_FRAME_SIZE);

            this.socket.write(

                encodeMessage(

                    count > 0 ? OPCODES.CONTINUE : opcode,

                    framePayload,

                    false

                )

            ); //编码后直接通过socket发送

            count++;

            len = Buffer.byteLength(payload);

        }


        this.socket.write(

            encodeMessage(count > 0 ? OPCODES.CONTINUE : opcode, payload)

        ); //编码后直接通过socket发送

    }

}


module.exports = WebSocket;


运行测试代码,打开浏览器访问:http://localhost:3000/,在控制台输入:

var http = require('http');

var WebSocket = require('./websocket');

// HTTP服务器部分

var server = http.createServer(function(req, res) {

  res.end('websocket test\r\n');

});


console.log('starting...');


// Upgrade请求处理

server.on('upgrade', callback);


function callback(req, socket, upgradeHead) {

  var ws = new WebSocket(req, socket, upgradeHead);

  // ws.keepLive(); // 保持心跳连接,否则一般经过一定的时间没有数据交互,浏览器端会主动关闭 ws 链接

  ws.on('data', function(opcode, payload) {

    console.log('receive data:', opcode, payload.length);

    ws.send('good job');

  });



  ws.on('close', function(code, reason) {

    console.log('close:', code, reason);

  });


}


server.listen(3000);

建立连接后发送消息:

ws.send('hello world');

成功发送并收到回复!

也可以通过报文查看:

参考文档




————————————————

https://juejin.cn/post/7236954203555151933


该文章在 2023/5/30 10:09:07 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved