服务器传递
在重新连接时同步客户端状态有两种常见方法:
- 服务器发送整个状态
- 客户端跟踪其处理的最后一个事件,服务器发送缺失的部分
这两种方案都是完全有效的,选择哪种取决于你的使用场景。在本教程中,我们将选择后者。
首先,让我们持久化聊天应用程序的消息。如今有很多不错的选择,这里我们使用 SQLite。
提示
如果你不熟悉 SQLite,可以在网上找到很多教程,比如 这个。
安装必要的包:
- NPM
- Yarn
- pnpm
npm install sqlite sqlite3
yarn add sqlite sqlite3
pnpm add sqlite sqlite3
我们将简单地将每条消息存储在一个 SQL 表中:
- CommonJS
- ES modules
index.js
const express = require('express');
const { createServer } = require('node:http');
const { join } = require('node:path');
const { Server } = require('socket.io');
const sqlite3 = require('sqlite3');
const { open } = require('sqlite');
async function main() {
// 打开数据库文件
const db = await open({
filename: 'chat.db',
driver: sqlite3.Database
});
// 创建我们的 'messages' 表(可以暂时忽略 'client_offset' 列)
await db.exec(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
client_offset TEXT UNIQUE,
content TEXT
);
`);
const app = express();
const server = createServer(app);
const io = new Server(server, {
connectionStateRecovery: {}
});
app.get('/', (req, res) => {
res.sendFile(join(__dirname, 'index.html'));
});
io.on('connection', (socket) => {
socket.on('chat message', async (msg) => {
let result;
try {
// 将消息存储在数据库中
result = await db.run('INSERT INTO messages (content) VALUES (?)', msg);
} catch (e) {
// TODO 处理失败
return;
}
// 包含偏移量与消息一起发送
io.emit('chat message', msg, result.lastID);
});
});
server.listen(3000, () => {
console.log('server running at http://localhost:3000');
});
}
main();
index.js
import express from 'express';
import { createServer } from 'node:http';
import { fileURLToPath } from 'node:url';
import { dirname, join } from 'node:path';
import { Server } from 'socket.io';
import sqlite3 from 'sqlite3';
import { open } from 'sqlite';
// 打开数据库文件
const db = await open({
filename: 'chat.db',
driver: sqlite3.Database
});
// 创建我们的 'messages' 表(可以暂时忽略 'client_offset' 列)
await db.exec(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
client_offset TEXT UNIQUE,
content TEXT
);
`);
const app = express();
const server = createServer(app);
const io = new Server(server, {
connectionStateRecovery: {}
});
const __dirname = dirname(fileURLToPath(import.meta.url));
app.get('/', (req, res) => {
res.sendFile(join(__dirname, 'index.html'));
});
io.on('connection', (socket) => {
socket.on('chat message', async (msg) => {
let result;
try {
// 将消息存储在数据库中
result = await db.run('INSERT INTO messages (content) VALUES (?)', msg);
} catch (e) {
// TODO 处理失败
return;
}
// 包含偏移量与消息一起发送
io.emit('chat message', msg, result.lastID);
});
});
server.listen(3000, () => {
console.log('server running at http://localhost:3000');
});
客户端将跟踪偏移量:
- ES6
- ES5
index.html
<script>
const socket = io({
auth: {
serverOffset: 0
}
});
const form = document.getElementById('form');
const input = document.getElementById('input');
const messages = document.getElementById('messages');
form.addEventListener('submit', (e) => {
e.preventDefault();
if (input.value) {
socket.emit('chat message', input.value);
input.value = '';
}
});
socket.on('chat message', (msg, serverOffset) => {
const item = document.createElement('li');
item.textContent = msg;
messages.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
socket.auth.serverOffset = serverOffset;
});
</script>
index.html
<script>
var socket = io({
auth: {
serverOffset: 0
}
});
var form = document.getElementById('form');
var input = document.getElementById('input');
var messages = document.getElementById('messages');
form.addEventListener('submit', function(e) {
e.preventDefault();
if (input.value) {
socket.emit('chat message', input.value);
input.value = '';
}
});
socket.on('chat message', function(msg, serverOffset) {
var item = document.createElement('li');
item.textContent = msg;
messages.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
socket.auth.serverOffset = serverOffset;
});
</script>
最后,服务器将在(重新)连接时发送缺失的消息:
index.js
// [...]
io.on('connection', async (socket) => {
socket.on('chat message', async (msg) => {
let result;
try {
result = await db.run('INSERT INTO messages (content) VALUES (?)', msg);
} catch (e) {
// TODO 处理失败
return;
}
io.emit('chat message', msg, result.lastID);
});
if (!socket.recovered) {
// 如果连接状态恢复不成功
try {
await db.each('SELECT id, content FROM messages WHERE id > ?',
[socket.handshake.auth.serverOffset || 0],
(_err, row) => {
socket.emit('chat message', row.content, row.id);
}
)
} catch (e) {
// 出现错误
}
}
});
// [...]
让我们看看实际效果:
如上面视频所示,它在临时断开连接和完全刷新页面后都能正常工作。
提示
与“连接状态恢复”功能的区别在于,成功的恢复可能不需要访问主数据库(例如,它可能从 Redis 流中获取消息)。
好了,现在让我们讨论客户端传递。