背景:最近的项目中,有一个端到端控制的场景(即A发送命令,通过RTC服务,使B接收到消息并执行命令),其中为了开发方便使用了RTC服务作为两端通信的“桥梁“。每一组端,都连接到同一个RTC Room,这样一来,多组端都会互不影响。
上述业务可以简单抽象成下图,手柄 发送消息 “前进5米” 到RTC服务器,然后 遥控车 接受到该rtc消息后,执行 命令(前进5米)。抽象模型和执行流程如下:


All right!这一切看起来都是那么美好~~~
但是!But!
这个RTC服务是某不知名厂商提供的极其不稳定的服务,具备以下特点:
- 会掉线。
- 时序不能保证。
- 可能会丢失消息(根据网络质量,进行多倍发包)
可谓是条条致命啊!在刚开始的业务中,由于使用并不频繁,所以rtc服务也就相对较”稳定”,上述问题并没有浮现出来。但是在业务扩张后,真的是招招致命啊!
既然使用了rtc,那么就要对她负责!(F**K (╯‵□′)╯︵┻━┻),So, 我们来针对这几个问题分析一下,是否可解?(当然是可解,不然就不会有下文了)
首先,为什么会出现上述问题呢?
通过查阅资料,了解到:WebRTC使用了流控传输协议(SCTP),这个协议啊,其实是很棒的一个协议,同时兼备了TCP和UDP的功能,当然也比较复杂。具有多路复用等优点。通信是先建立四次握手建立SCTP连接,通过channel以流的形式发送消息(如下图),该通道有多路,数据互不干涉,但是当一条通道堵塞后,会导致该连接整体中断。
但是其服务提供者大多场景是在音视频通话,所以并没有启用可靠传输。没错,正是因此,导致时序错误、丢失消息。上面说到的通道阻塞又会导致SCTP连接中断则是掉线的罪魁祸首。

一、掉线问题
首先解决掉线问题,掉线是个相对不可预测的事件。所以我们可以考虑通过心跳探活和重连机制来处理。
举个例子:小明和小O在通过某信打语音电话,但是小明的网络并不好,所以小明会每隔两分钟向小O问:能听到我讲话嘛?小O回复:可以的。来保证网络正常,通话仍在继续。

以下是实现心跳的简单实现代码,其他细节则可忽略(例如小车移动)。
首先,我们模拟出RTC的实现,主要有RtcEvent接口用来实现事件监听、RtcMessage类统一消息体、RtcClient类则是Rtc的主要操作。
package rtc;
// 实现消息监听
public interface RtcEvent {
void handleMessage(RtcMessage msg);
}
package rtc;
// 统一化消息体
public class RtcMessage {
public String key; // 消息的Key
public int value; // 消息的内容
public long time; // 消息创建的时间戳
public RtcMessage() {
this.time = System.currentTimeMillis();
}
}
RtcClient也是模拟出来的,这部分主要模拟接收消息、掉线的情况。
package rtc;
// 模拟Rtc功能SDK的实现
public class RtcClient {
private boolean shutdown = true;
public RtcClient(String uuid) {
shutdown = false;
}
// 监听发送来的心跳和消息,用线程来模拟
public void onListening(RtcEvent e) {
// 模拟定时接收心跳的线程
Thread t = new Thread(() -> {
for (int i = 0; !shutdown ; i++) {
RtcMessage heartbeat = new RtcMessage();
heartbeat.key = null;
e.handleMessage(heartbeat);
System.out.println("心跳:"+ i);
// 1、模拟中间接收到消息
if (i == 10) {
// 执行命令
RtcMessage cmdMsg = new RtcMessage();
cmdMsg.key = "MOVE";
cmdMsg.value = 1000;
e.handleMessage(cmdMsg);
}
// 2、模拟掉线
if (i == 15){
try {
Thread.sleep(2000);
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
t.start();
}
// 模拟发送出去心跳
public void send(RtcMessage msg) {
System.out.println("send heartbeat: " + msg.time);
// e.handleMessage();
}
// 停止
public void close() {
// 让线程停止
this.shutdown = true;
}
}
然后我们可以去开发小车端的功能,利用心跳机制,避免断连。
import rtc.RtcClient;
import rtc.RtcMessage;
public class Car {
// SDK提供的 rtc 客户端
private static RtcClient rtcClient;
// 判断小车是否已经开启
public static volatile boolean isOn;
// 心跳线程
private static Thread heartbeatThread;
// 连接 的 rtc room 的 id
private String uuid;
// 上一次心跳时间
private long lastHeartbeatTime = -1;
// 死亡沟壑时间,如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
public static final int DEAD_GAP_TIME = 2000;
public Car(String uuid){
this.uuid = uuid;
rtcClient = new RtcClient(uuid);
// 定时发送心跳的线程
heartbeatThread = new Thread(() -> {
for(;;){
// 探活,是否可以按时接收到心跳,如果不可以,则说明client可能断开连接了,需要重新连接
checkAndReset();
System.out.println("last heartbeat: " + this.lastHeartbeatTime);
rtcClient.send(new RtcMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 开启心跳探活线程
heartbeatThread.start();
}
/**
* 重置 rtc 连接
*/
private synchronized void checkAndReset() {
if (lastHeartbeatTime == -1){
// 刚初始化,跳过
return;
}
// 如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
if (System.currentTimeMillis() - lastHeartbeatTime > DEAD_GAP_TIME){
if(rtcClient != null){
rtcClient.close();
}
// 重新建立连接
rtcClient = new RtcClient(this.uuid);
startUp();// 重新监听
System.out.println("reset: "+ rtcClient);
}
}
/**
* 开启小车
*/
public synchronized void startUp(){
// 开始监听 uuid room 的 rtc的消息
rtcClient.onListening(receiveMsg -> {
// 判断如果是心跳,则更新上一次探活时间
if (receiveMsg.key == null){
lastHeartbeatTime = receiveMsg.time;
}else {
// 执行rtc消息传来的命令,让小车进行相应的动作
executeCommand(receiveMsg.key, receiveMsg.value);
System.out.println("[ Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
}
});
}
/**
* 执行命令
* @param cmd 对应的指令
* @param v 值
*/
private void executeCommand(String cmd, int v){
switch (cmd) {
case "MOVE":
move(v);
break;
case "SPIN":
spin(v);
break;
}
}
/**
* 移动距离
* @param distance 距离
*/
private void move(int distance){
System.out.println("move " + distance);
}
/**
* 旋转角度
* @param angle 角度
*/
private void spin(int angle){
System.out.println("spin " + angle);
}
}
通过上面的代码,我们可以发现,对于连接断开的问题,我们通过心跳机制顺利解决~即在心跳间隔大于一定时间时候则判定断开连接,这个时候就要重新连接。至此,我们已经解决了第一个问题。
我们通过一个Demo程序,看一下心跳效果。
public class DemoMain {
public static void main(String[] args) throws InterruptedException {
Car c = new Car("10086");
c.startUp();
Thread.sleep(10000);
}
}

二、消息时序问题
我们上面的代码是在Client中开启线程,使用for循环,来模拟心跳(heartbeat)以及命令(Command)。在实际生产环境中,心跳可以使用for循环来实现;但是命令是随时可能发送的,然而我们使用的SCTP协议的时候没有保证消息接受的顺序,所以会产生什么问题呢?看下面的gif,蓝色表示木板,黄色箭头表示小车和车头方向。
正常情况下,控制器发送三条命令,依次是 移动1、旋转90°、移动1。小车按顺序接收命令,则可以顺利到达指定位置(图2-1)。

但是,如果消息并不是按顺序到达,而是乱序,就会出现下面的情况(图2-2),先接收到 SPIN,然后接收到 MOVE,就会导致小车从木板上掉落下去。

显然,我们应该避免(图2-2)这样的情况,所以如何解决呢?我们可以借鉴TCP的机制。
既然大家看到了这篇文章,想必大家都或多或少了解TCP协议吧,在TCP协议中,通过SEQ和ACK的机制来确保报文顺序。具体是指发送方将要发送的数据分割成合适大小的报文段,然后在每个报文段标上序号,这个序号就是SEQ,接收端在接收到报文后,发送ACK来反馈自己已经接受到的数据报文,以及通知发送方下次发送的报文序列号。这个期间,接收方可能以乱序的形式接受到这些报文,那么在接收后,对报文进行排序,即可达到数据顺序正确的目的。

分析到这里,我们可以试一试走Seq这条路,给每个消息打上Seq,然后通过Seq来保证时序,这里先以简单的实现方式来出发,不考虑消息丢失,只考虑消息乱序。
首先,改造RtcMessage,使其支持消息序号
package rtc;
public class RtcMessage implements Comparable<RtcMessage> {
public String key;
public int value;
public long time;
public long seq;
public RtcMessage() {
this.time = System.currentTimeMillis();
}
@Override // 实现比较,为了让消息自排序,后面会讲为什么要实现这个方法
public int compareTo(RtcMessage o) {
if (this.seq > o.seq) {
return -1;
}
return 1;
}
}
然后改造RtcClient,模拟发送多条指令,以多线程的方式发送,并携带序号。
package rtc;
import java.util.ArrayList;
import java.util.List;
public class RtcClient {
// 使用标志位来控制线程的停止
private boolean shutdown = true;
public RtcClient(String uuid) {
shutdown = false;
}
// 监听发送来的心跳和消息,用线程来模拟
public void onListening(RtcEvent e) {
// 定时发送心跳的线程
Thread t = new Thread(() -> {
for (int i = 0; !shutdown ; i++) {
RtcMessage heartbeat = new RtcMessage();
heartbeat.key = null;
e.handleMessage(heartbeat);
System.out.println("心跳:"+ i);
// 模拟中间接收到消息
if (i == 10) {
// 正确顺序,0-MOVE、1-SPIN、2-MOVE、3-SPIN、4-MOVE
// 先生成顺序消息
List<RtcMessage> msgList = new ArrayList<>(); // 使用list装
for(int j = 0; j < 5; j++){
RtcMessage cmdMsg = new RtcMessage();
if ((j & 1) == 0){
// 执行命令
cmdMsg.key = "MOVE";
cmdMsg.value = 1000;
}else {
// 执行命令
cmdMsg.key = "SPIN";
cmdMsg.value = 90;
}
cmdMsg.seq = j;
msgList.add(cmdMsg);
}
// 然后开启多线程发送指令
msgList.forEach(msg -> {
new Thread(() -> {
e.handleMessage(msg);
}).start();
});
}
// 模拟掉线
if (i == 15){
try {
Thread.sleep(2000);
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
t.start();
}
// 模拟发送出去心跳
public void send(RtcMessage msg) {
System.out.println("send heartbeat: " + msg.time);
// e.handleMessage();
}
// 停止
public void close() {
// 让线程停止
this.shutdown = true;
}
}
然后,我们在Car中打印出执行的指令和序列号,出现了下面的情况,小车接收到的指令顺序并不正确,但是如果我们直接按照这个顺序执行(转180°,再走3000米),就会出大问题;而我们期望的是(走1000米,转90°,走1000米,转90°,走1000米)。

我们上文中引入了Seq的概念,所以接下来,我们在小车接收端(Car)也使用以下吧~
import rtc.RtcClient;
import rtc.RtcMessage;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
public class Car {
// SDK提供的 rtc 客户端
private static RtcClient rtcClient;
// 判断小车是否已经开启
public static volatile boolean isOn;
// 心跳线程
private static Thread heartbeatThread;
// 连接 的 rtc room 的 id
private String uuid;
// 上一次心跳时间
private long lastHeartbeatTime = -1;
// 死亡沟壑时间,如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
private static final int DEAD_GAP_TIME = 2000;
// 记录上一次消费的命令Seq
private static final AtomicLong lastSeq = new AtomicLong();
// 顺序队列
private static final PriorityBlockingQueue<RtcMessage> waitingQueue = new PriorityBlockingQueue<>();
// 锁
private static final ReentrantLock lock = new ReentrantLock();
public Car(String uuid) {
this.uuid = uuid;
rtcClient = new RtcClient(uuid);
// 定时发送心跳的线程
heartbeatThread = new Thread(() -> {
for (; ; ) {
// 探活,是否可以按时接收到心跳,如果不可以,则说明client可能断开连接了,需要重新连接
checkAndReset();
System.out.println("last heartbeat: " + this.lastHeartbeatTime);
rtcClient.send(new RtcMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 开启心跳探活线程
heartbeatThread.start();
initMsgWaitingQueue();
}
/**
* 初始化命令消费等待队列,开启线程,轮询等待队列,做到有顺序消费(执行命令)
*/
private void initMsgWaitingQueue() {
// 初始化首次消费命令顺序为0,以后每消费一次,序列号都+1,这个在tcp中可以作为ACK
new Thread(() -> {
for (; ; ) {
lock.lock();
System.out.println("wait for command...");
if (waitingQueue.size() == 0) {
// 还没有消息
// System.out.println("null message.");
} else {
RtcMessage receiveMsg = waitingQueue.peek();
// 如果本次有消息
// 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
if (receiveMsg.seq != lastSeq.get()) {
// 既然不相等,那就需要再减回去
System.out.printf("receiveMsg seq: %d, last seq: %d\n", receiveMsg.seq, lastSeq.get());
System.out.println("not sequence message, continue wait...");
} else {
// 若 seq = lastSeq + 1,则直接执行命令,进行消费
executeCommand(receiveMsg.key, receiveMsg.value);
lastSeq.addAndGet(1); // 标记上次消费的位置
System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
waitingQueue.poll();
continue;
}
}
lock.unlock();
// 等一下消息吧
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* 重置 rtc 连接
*/
private synchronized void checkAndReset() {
if (lastHeartbeatTime == -1){
// 刚初始化,跳过
return;
}
// 如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
if (System.currentTimeMillis() - lastHeartbeatTime > DEAD_GAP_TIME) {
if (rtcClient != null) {
rtcClient.close();
}
// 重新建立连接
rtcClient = new RtcClient(this.uuid);
startUp();// 重新监听
System.out.println("reset: " + rtcClient);
}
}
/**
* 开启小车
*/
public synchronized void startUp() {
// 开始监听 uuid room 的 rtc的消息
rtcClient.onListening(receiveMsg -> {
// 判断如果是心跳,则更新上一次探活时间
if (receiveMsg.key == null) {
lastHeartbeatTime = receiveMsg.time;
} else {
// 执行rtc消息传来的命令,让小车进行相应的动作
// executeCommand(receiveMsg.key, receiveMsg.value);
// System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
addWaitingQueue(receiveMsg); // 添加等待队列,来代替直接消费,这里使用优先队列保证顺序
}
});
}
/**
* @param receiveMsg 接收到的消息
*/
private void addWaitingQueue(RtcMessage receiveMsg) {
// 加个锁吧~
lock.lock();
waitingQueue.offer(receiveMsg);
lock.unlock();
}
/**
* 执行命令
*
* @param cmd 对应的指令
* @param v 值
*/
private void executeCommand(String cmd, int v) {
switch (cmd) {
case "MOVE":
move(v);
break;
case "SPIN":
spin(v);
break;
}
}
/**
* 移动距离
*
* @param distance 距离
*/
private void move(int distance) {
System.out.println("move " + distance);
}
/**
* 旋转角度
*
* @param angle 角度
*/
private void spin(int angle) {
System.out.println("spin " + angle);
}
public void close() {
synchronized (waitingQueue) {
waitingQueue.forEach(c -> System.out.print("*****************" + c.seq + ", "));
System.out.println();
}
}
}

上面通过对 小车(Car) 的改造,将【收到命令立即执行命令】改为【收到命令后先放入优先队列】,然后【对命令进行有序执行】,顺利的解决了命令乱序的问题,真的是 Awesome ~
三、消息丢失问题
按顺序看下来的小伙伴们可以从上面了解到,在处理命令乱序问题时候,我们默认消息是不丢失的;但是呢,这个消息可并不是那么乐观,可能存在丢失的情况,这可就不好了,既然消息丢失,那我们的命令很可能无法接收到,也会出现很严重的后果!
当然,这个我们也是有解滴~有小伙伴可能会讲了:这不就用上面说到的TCP的可靠传输的方案来解决不就可以了嘛?没错,可以!但是笔者在这里小小偷了个懒,使用了一种不是很好但能快速解决问题的方案:多倍发包。至于TCP的可靠传输,可以下来自己尝试实现一下~
我们看一下,什么是多倍发包。
举个栗子:快要过年了,小明想要抢票回家,但是呢,一票难求,大家都懂。所以小明在放票前,打开了10个相同的买票窗口,在到点时,快速的把每个窗口都点了一遍买票。由于小明的“多倍发包”,成功抢到了回家的票~
对于这个消息丢失,我们可以通过每次执行命令,发多个相同的指令,在小车接收端,对于相同序号的指令只执行一遍,这样就可以在即使部分指令丢了的情况下,也最大程度保证消息的完整。下面我们来实现一下吧~
首先,模拟消息丢失,在 Car 中的 startUp() 开始监听消息的方法中,加上随机丢失的代码,如下
// Car.class
/**
* 开启小车
*/
public synchronized void startUp() {
// 开始监听 uuid room 的 rtc的消息
rtcClient.onListening(receiveMsg -> {
// 判断如果是心跳,则更新上一次探活时间
if (receiveMsg.key == null) {
lastHeartbeatTime = receiveMsg.time;
} else {
// 执行rtc消息传来的命令,让小车进行相应的动作
if(new Random().nextBoolean()){
// 模拟随机丢包
System.out.println("消息丢失啦【" + receiveMsg.seq + ", "+receiveMsg.key + ": " + receiveMsg.value + "】");
return;
}
// executeCommand(receiveMsg.key, receiveMsg.value);
// System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
addWaitingQueue(receiveMsg); // 添加等待队列,来代替直接消费,这里使用优先队列保证顺序
}
});
}
我们执行一下,下图就是模拟在接收数据时,1、4两个命令都丢失了,导致程序无法正常执行命令。

接下来,添加多倍发包,在RtcClient中模拟,在OnListening()方法中,同样的指令发送多次,如下:
// RtcClient.class
// 这里可以设置多倍发包的倍数
private static final int MULTIPLE_MESSAGE_SEND_TIMES = 3;
// 监听发送来的心跳和消息,用线程来模拟
public void onListening(RtcEvent e) {
// 定时发送心跳的线程
Thread t = new Thread(() -> {
for (int i = 0; !shutdown ; i++) {
RtcMessage heartbeat = new RtcMessage();
heartbeat.key = null;
e.handleMessage(heartbeat);
System.out.println("心跳:"+ i);
// 模拟中间接收到消息
if (i == 10) {
// 正确顺序,0-MOVE、1-SPIN、2-MOVE、3-SPIN、4-MOVE
// 先生成顺序消息
List<RtcMessage> msgList = new ArrayList<>(); // 使用list装
for(int j = 0; j < 5; j++){
RtcMessage cmdMsg = new RtcMessage();
if ((j & 1) == 0){
// 执行命令
cmdMsg.key = "MOVE";
cmdMsg.value = 1000;
}else {
// 执行命令
cmdMsg.key = "SPIN";
cmdMsg.value = 90;
}
cmdMsg.seq = j;
msgList.add(cmdMsg);
}
// 然后开启多线程发送指令
msgList.forEach(msg -> {
new Thread(() -> {
// 多倍发包
for (int j = 0; j < MULTIPLE_MESSAGE_SEND_TIMES; j++) {
e.handleMessage(msg);
}
}).start();
});
}
// 模拟掉线
if (i == 15){
try {
Thread.sleep(2000);
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
});
t.start();
}
执行,虽然这次能保证客户端能接收到每个Seq至少一条消息,但是,有些Seq的消息接收了多遍,如果都执行,那么也不符合预期。如下图,丢失了 【Seq=0、0、1、1、2、3、4、4】 这八条消息,接受到了【Seq=0、1、2、2、3、3、4】这七条消息,执行结果会如下,我们只能正确执行【0、1、2】三条消息的指令,不能继续往下执行了,因为当前队列头部为2,但是我们的lastSeq已经指向3,所以我们不能继续执行2.

所以我们继续修改Car小车接收端,如果命令被执行过,则直接丢掉,不进入阻塞队列,这样我们就可以继续有序的执行命令。只需要对监听 waitingQueue 的方法进行处理即可,让重复的命令包丢掉。下面是对Car类下的 initMsgWaitingQueue() 方法进行改造,在判断队列内消息的开始,先判断是否已经接收过这个Seq,接受过的话直接丢掉。
Car.class
/**
* 初始化命令消费等待队列,开启线程,轮询等待队列,做到有顺序消费(执行命令)
*/
private void initMsgWaitingQueue() {
// 初始化首次消费命令顺序为0,以后每消费一次,序列号都+1,这个在tcp中可以作为ACK
new Thread(() -> {
for (; ; ) {
lock.lock();
System.out.println("wait for command...");
if (waitingQueue.size() == 0) {
// 还没有消息
// System.out.println("null message.");
} else {
RtcMessage receiveMsg = waitingQueue.peek();
// 如果队首的Command的Seq小于当前执行的lastSeq,那么说明这条命令已经被执行了,所以丢掉
if (receiveMsg.seq < lastSeq.get()){
waitingQueue.poll(); // 丢掉
continue; // 直接进行下一轮
}
// 如果本次有消息
// 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
if (receiveMsg.seq != lastSeq.get()) {
// 既然不相等,那就需要再减回去
System.out.printf("receiveMsg seq: %d, last seq: %d\n", receiveMsg.seq, lastSeq.get());
System.out.println("not sequence message, continue wait...");
} else {
// 若 seq = lastSeq + 1,则直接执行命令,进行消费
executeCommand(receiveMsg.key, receiveMsg.value);
lastSeq.addAndGet(1); // 标记上次消费的位置
System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
waitingQueue.poll();
continue;
}
}
lock.unlock();
// 等一下消息吧
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// try {
// RtcMessage receiveMsg = waitingQueue.poll(1, TimeUnit.SECONDS);
// if (receiveMsg == null) {
// System.out.println("null message.");
// continue;
// }
// waitingQueue.forEach(c -> System.out.println(c.seq));
// // 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
// if (receiveMsg.seq != lastSeq.get() + 1) {
// addWaitingQueue(receiveMsg); // 再次入队
// Thread.sleep(1000); // 再等待一秒吧
// continue;
// }
// // 若 seq = lastSeq + 1,则直接执行命令
// executeCommand(receiveMsg.key, receiveMsg.value);
// lastSeq.addAndGet(1); // 标记上次消费的位置
// System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}).start();
}
下面就是暴力发包的效果,即使丢了【1、2、2、3、3】消息,也可以看到后面按顺序执行了每一条指令,而且只执行了一次。

以上,就是对不稳定消息服务的改造全部内容~其中许多地方写的不够严谨,例如客户端重连后,Seq会重置,比较重点不是在这里嘛,而且解决这个问题应该也不是太复杂,大家可以搞一下嘛。
其他文章
- 由遥控车引起的不可靠的消息服务(监听断连、顺序颠倒、消息丢失)的问题及解决背景:最近的项目中,有一个端到端控制的场景(即A发送命令,通过RTC服[…]
- Red Dead Redemption 2: 永远的亚瑟 · 摩根Red Dead Redemption 2 是一个致敬西部牛[…]
- 分享如何低成本DIY组装一台显示器相信有许多动手能力强的朋友,你是否想组装一台属于自己的独一无二的显示器呢?笔者下面分享一下Diy显示器的过程。文中也有一些对于显示器参数的拙见以及显示器最终的测试方式。
- OBS录制全屏黑屏解决方法问题描述:在使用OBS进行录屏时候,只能录制特定窗口,当选屏幕录制时候[…]
- 浅谈ArrayList源码实现本文记录笔者对JDK1.8下ArrayList源码的解读 ArrayL[…]