由遥控车引起的不可靠的消息服务(监听断连、顺序颠倒、消息丢失)的问题及解决

遥控车

背景:最近的项目中,有一个端到端控制的场景(即A发送命令,通过RTC服务,使B接收到消息并执行命令),其中为了开发方便使用了RTC服务作为两端通信的“桥梁“。每一组端,都连接到同一个RTC Room,这样一来,多组端都会互不影响。

上述业务可以简单抽象成下图,手柄 发送消息 “前进5米” 到RTC服务器,然后 遥控车 接受到该rtc消息后,执行 命令(前进5米)。抽象模型和执行流程如下:

业务抽象表示

执行流程

All right!这一切看起来都是那么美好~~~

但是But

这个RTC服务是某不知名厂商提供的极其不稳定的服务,具备以下特点:

  1. 会掉线。
  2. 时序不能保证。
  3. 可能会丢失消息(根据网络质量,进行多倍发包)

可谓是条条致命啊!在刚开始的业务中,由于使用并不频繁,所以rtc服务也就相对较”稳定”,上述问题并没有浮现出来。但是在业务扩张后,真的是招招致命啊!

既然使用了rtc,那么就要对她负责!(F**K (╯‵□′)╯︵┻━┻),So, 我们来针对这几个问题分析一下,是否可解?(当然是可解,不然就不会有下文了)

首先,为什么会出现上述问题呢?

通过查阅资料,了解到:WebRTC使用了流控传输协议(SCTP),这个协议啊,其实是很棒的一个协议,同时兼备了TCP和UDP的功能,当然也比较复杂。具有多路复用等优点。通信是先建立四次握手建立SCTP连接,通过channel以流的形式发送消息(如下图),该通道有多路,数据互不干涉,但是当一条通道堵塞后,会导致该连接整体中断。

但是其服务提供者大多场景是在音视频通话,所以并没有启用可靠传输。没错,正是因此,导致时序错误、丢失消息。上面说到的通道阻塞又会导致SCTP连接中断则是掉线的罪魁祸首。

SCTP通道,其中msg可以以有序(绿色)发送,也可以以无序(黄色)发送

一、掉线问题

首先解决掉线问题,掉线是个相对不可预测的事件。所以我们可以考虑通过心跳探活和重连机制来处理。

举个例子:小明和小O在通过某信打语音电话,但是小明的网络并不好,所以小明会每隔两分钟向小O问:能听到我讲话嘛?小O回复:可以的。来保证网络正常,通话仍在继续。

以下是实现心跳的简单实现代码,其他细节则可忽略(例如小车移动)。

首先,我们模拟出RTC的实现,主要有RtcEvent接口用来实现事件监听、RtcMessage类统一消息体、RtcClient类则是Rtc的主要操作。

package rtc;
// 实现消息监听
public interface RtcEvent {
    void handleMessage(RtcMessage msg);
}
package rtc;
// 统一化消息体
public class RtcMessage {
    public String key; // 消息的Key
    public int value; // 消息的内容
    public long time; // 消息创建的时间戳

    public RtcMessage() {
        this.time = System.currentTimeMillis();
    }
}

RtcClient也是模拟出来的,这部分主要模拟接收消息、掉线的情况。

package rtc;
// 模拟Rtc功能SDK的实现
public class RtcClient {

    private boolean shutdown = true;

    public RtcClient(String uuid) {
        shutdown = false;
    }

    // 监听发送来的心跳和消息,用线程来模拟
    public void onListening(RtcEvent e) {

        // 模拟定时接收心跳的线程
        Thread t = new Thread(() -> {
            for (int i = 0; !shutdown ; i++) {
                RtcMessage heartbeat = new RtcMessage();
                heartbeat.key = null;
                e.handleMessage(heartbeat);
                System.out.println("心跳:"+ i);

                // 1、模拟中间接收到消息
                if (i == 10) {
                    // 执行命令
                    RtcMessage cmdMsg = new RtcMessage();
                    cmdMsg.key = "MOVE";
                    cmdMsg.value = 1000;
                    e.handleMessage(cmdMsg);
                }

                // 2、模拟掉线
                if (i == 15){
                    try {
                        Thread.sleep(2000);
                        break;
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        });
        t.start();

    }

    // 模拟发送出去心跳
    public void send(RtcMessage msg) {
        System.out.println("send heartbeat: " + msg.time);
        //        e.handleMessage();
    }

    // 停止
    public void close() {
        // 让线程停止
        this.shutdown = true;
    }
}

然后我们可以去开发小车端的功能,利用心跳机制,避免断连。

import rtc.RtcClient;
import rtc.RtcMessage;

public class Car {

    // SDK提供的 rtc 客户端
    private static RtcClient rtcClient;
    // 判断小车是否已经开启
    public static volatile boolean isOn;
    // 心跳线程
    private static Thread heartbeatThread;
    // 连接 的 rtc room 的 id
    private String uuid;
    // 上一次心跳时间
    private long lastHeartbeatTime = -1;
    // 死亡沟壑时间,如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
    public static final int DEAD_GAP_TIME = 2000;

    public Car(String uuid){
        this.uuid = uuid;
        rtcClient = new RtcClient(uuid);

        // 定时发送心跳的线程
        heartbeatThread = new Thread(() -> {
            for(;;){
                // 探活,是否可以按时接收到心跳,如果不可以,则说明client可能断开连接了,需要重新连接
                checkAndReset();
                System.out.println("last heartbeat: " + this.lastHeartbeatTime);
                rtcClient.send(new RtcMessage());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        // 开启心跳探活线程
        heartbeatThread.start();
    }

    /**
     * 重置 rtc 连接
     */
    private synchronized void checkAndReset() {
        if (lastHeartbeatTime == -1){
            // 刚初始化,跳过
            return;
        }
        // 如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
        if (System.currentTimeMillis() - lastHeartbeatTime > DEAD_GAP_TIME){
            if(rtcClient != null){
                rtcClient.close();
            }
            // 重新建立连接
            rtcClient = new RtcClient(this.uuid);
            startUp();// 重新监听
            System.out.println("reset: "+ rtcClient);
        }
    }

    /**
     * 开启小车
     */
    public synchronized void startUp(){

        // 开始监听 uuid room 的 rtc的消息
        rtcClient.onListening(receiveMsg -> {

            // 判断如果是心跳,则更新上一次探活时间
            if (receiveMsg.key == null){
                lastHeartbeatTime = receiveMsg.time;
            }else {
                // 执行rtc消息传来的命令,让小车进行相应的动作
                executeCommand(receiveMsg.key, receiveMsg.value);
                System.out.println("[ Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");

            }
        });
    }

    /**
     * 执行命令
     * @param cmd 对应的指令
     * @param v 值
     */
    private void executeCommand(String cmd, int v){
        switch (cmd) {
            case "MOVE":
                move(v);
                break;
            case "SPIN":
                spin(v);
                break;
        }
    }


    /**
     * 移动距离
     * @param distance 距离
     */
    private void move(int distance){
        System.out.println("move " + distance);
    }

    /**
     * 旋转角度
     * @param angle 角度
     */
    private void spin(int angle){
        System.out.println("spin " + angle);
    }

}

通过上面的代码,我们可以发现,对于连接断开的问题,我们通过心跳机制顺利解决~即在心跳间隔大于一定时间时候则判定断开连接,这个时候就要重新连接。至此,我们已经解决了第一个问题。

我们通过一个Demo程序,看一下心跳效果。

public class DemoMain {

    public static void main(String[] args) throws InterruptedException {
        Car c = new Car("10086");
        c.startUp();
        Thread.sleep(10000);
    }
}

二、消息时序问题

我们上面的代码是在Client中开启线程,使用for循环,来模拟心跳(heartbeat)以及命令(Command)。在实际生产环境中,心跳可以使用for循环来实现;但是命令是随时可能发送的,然而我们使用的SCTP协议的时候没有保证消息接受的顺序,所以会产生什么问题呢?看下面的gif,蓝色表示木板,黄色箭头表示小车和车头方向。

正常情况下,控制器发送三条命令,依次是 移动1、旋转90°、移动1。小车按顺序接收命令,则可以顺利到达指定位置(图2-1)。

图2-1 正常情况

但是,如果消息并不是按顺序到达,而是乱序,就会出现下面的情况(图2-2),先接收到 SPIN,然后接收到 MOVE,就会导致小车从木板上掉落下去。

图2-2 乱序情况

显然,我们应该避免(图2-2)这样的情况,所以如何解决呢?我们可以借鉴TCP的机制。

既然大家看到了这篇文章,想必大家都或多或少了解TCP协议吧,在TCP协议中,通过SEQ和ACK的机制来确保报文顺序。具体是指发送方将要发送的数据分割成合适大小的报文段,然后在每个报文段标上序号,这个序号就是SEQ,接收端在接收到报文后,发送ACK来反馈自己已经接受到的数据报文,以及通知发送方下次发送的报文序列号。这个期间,接收方可能以乱序的形式接受到这些报文,那么在接收后,对报文进行排序,即可达到数据顺序正确的目的。

图源自网络(https://www.cnblogs.com/silyvin/p/11927398.html)

分析到这里,我们可以试一试走Seq这条路,给每个消息打上Seq,然后通过Seq来保证时序,这里先以简单的实现方式来出发,不考虑消息丢失,只考虑消息乱序。

首先,改造RtcMessage,使其支持消息序号

package rtc;

public class RtcMessage implements Comparable<RtcMessage> {
    public String key;
    public int value;
    public long time;
    public long seq;

    public RtcMessage() {
        this.time = System.currentTimeMillis();
    }


    @Override // 实现比较,为了让消息自排序,后面会讲为什么要实现这个方法
    public int compareTo(RtcMessage o) {
        if (this.seq > o.seq) {
            return -1;
        }
        return 1;
    }
}

然后改造RtcClient,模拟发送多条指令,以多线程的方式发送,并携带序号。

package rtc;

import java.util.ArrayList;
import java.util.List;

public class RtcClient {

    // 使用标志位来控制线程的停止
    private boolean shutdown = true;

    public RtcClient(String uuid) {
        shutdown = false;
    }

    // 监听发送来的心跳和消息,用线程来模拟
    public void onListening(RtcEvent e) {

        // 定时发送心跳的线程
        Thread t = new Thread(() -> {
            for (int i = 0; !shutdown ; i++) {
                RtcMessage heartbeat = new RtcMessage();
                heartbeat.key = null;
                e.handleMessage(heartbeat);
                System.out.println("心跳:"+ i);

                // 模拟中间接收到消息
                if (i == 10) {

                    // 正确顺序,0-MOVE、1-SPIN、2-MOVE、3-SPIN、4-MOVE
                    // 先生成顺序消息
                    List<RtcMessage> msgList = new ArrayList<>(); // 使用list装
                    for(int j = 0; j < 5; j++){
                        RtcMessage cmdMsg = new RtcMessage();
                        if ((j & 1) == 0){
                            // 执行命令
                            cmdMsg.key = "MOVE";
                            cmdMsg.value = 1000;
                        }else {
                            // 执行命令
                            cmdMsg.key = "SPIN";
                            cmdMsg.value = 90;
                        }
                        cmdMsg.seq = j;
                        msgList.add(cmdMsg);
                    }
                    // 然后开启多线程发送指令
                    msgList.forEach(msg -> {
                        new Thread(() -> {
                            e.handleMessage(msg);
                        }).start();
                    });


                }

                // 模拟掉线
                if (i == 15){
                    try {
                        Thread.sleep(2000);
                        break;
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        });
        t.start();

    }

    // 模拟发送出去心跳
    public void send(RtcMessage msg) {
        System.out.println("send heartbeat: " + msg.time);
        //        e.handleMessage();
    }

    // 停止
    public void close() {
        // 让线程停止
        this.shutdown = true;
    }
}

然后,我们在Car中打印出执行的指令和序列号,出现了下面的情况,小车接收到的指令顺序并不正确,但是如果我们直接按照这个顺序执行(转180°,再走3000米),就会出大问题;而我们期望的是(走1000米,转90°,走1000米,转90°,走1000米)。

我们上文中引入了Seq的概念,所以接下来,我们在小车接收端(Car)也使用以下吧~

import rtc.RtcClient;
import rtc.RtcMessage;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

public class Car {

    // SDK提供的 rtc 客户端
    private static RtcClient rtcClient;
    // 判断小车是否已经开启
    public static volatile boolean isOn;
    // 心跳线程
    private static Thread heartbeatThread;
    // 连接 的 rtc room 的 id
    private String uuid;
    // 上一次心跳时间
    private long lastHeartbeatTime = -1;
    // 死亡沟壑时间,如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
    private static final int DEAD_GAP_TIME = 2000;
    // 记录上一次消费的命令Seq
    private static final AtomicLong lastSeq = new AtomicLong();
    // 顺序队列
    private static final PriorityBlockingQueue<RtcMessage> waitingQueue = new PriorityBlockingQueue<>();
    // 锁
    private static final ReentrantLock lock = new ReentrantLock();

    public Car(String uuid) {
        this.uuid = uuid;
        rtcClient = new RtcClient(uuid);

        // 定时发送心跳的线程
        heartbeatThread = new Thread(() -> {
            for (; ; ) {
                // 探活,是否可以按时接收到心跳,如果不可以,则说明client可能断开连接了,需要重新连接
                checkAndReset();
                System.out.println("last heartbeat: " + this.lastHeartbeatTime);
                rtcClient.send(new RtcMessage());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        // 开启心跳探活线程
        heartbeatThread.start();
        initMsgWaitingQueue();
    }

    /**
     * 初始化命令消费等待队列,开启线程,轮询等待队列,做到有顺序消费(执行命令)
     */
    private void initMsgWaitingQueue() {
        // 初始化首次消费命令顺序为0,以后每消费一次,序列号都+1,这个在tcp中可以作为ACK
        new Thread(() -> {
            for (; ; ) {
                lock.lock();
                System.out.println("wait for command...");
                if (waitingQueue.size() == 0) {
                    // 还没有消息
//                    System.out.println("null message.");
                } else {
                    RtcMessage receiveMsg = waitingQueue.peek();

                    // 如果本次有消息
                    // 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
                    if (receiveMsg.seq != lastSeq.get()) {
                        // 既然不相等,那就需要再减回去
                        System.out.printf("receiveMsg seq: %d, last seq: %d\n", receiveMsg.seq, lastSeq.get());
                        System.out.println("not sequence message, continue wait...");
                    } else {
                        // 若 seq = lastSeq + 1,则直接执行命令,进行消费
                        executeCommand(receiveMsg.key, receiveMsg.value);
                        lastSeq.addAndGet(1); // 标记上次消费的位置
                        System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
                        waitingQueue.poll();
                        continue;
                    }
                }
                lock.unlock();

                // 等一下消息吧
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }).start();
    }

    /**
     * 重置 rtc 连接
     */
    private synchronized void checkAndReset() {
        if (lastHeartbeatTime == -1){
            // 刚初始化,跳过
            return;
        }
        // 如果超过2000毫秒未接受到心跳,则说明挂了,需要重启client
        if (System.currentTimeMillis() - lastHeartbeatTime > DEAD_GAP_TIME) {
            if (rtcClient != null) {
                rtcClient.close();
            }
            // 重新建立连接
            rtcClient = new RtcClient(this.uuid);
            startUp();// 重新监听
            System.out.println("reset: " + rtcClient);
        }
    }

    /**
     * 开启小车
     */
    public synchronized void startUp() {

        // 开始监听 uuid room 的 rtc的消息
        rtcClient.onListening(receiveMsg -> {

            // 判断如果是心跳,则更新上一次探活时间
            if (receiveMsg.key == null) {
                lastHeartbeatTime = receiveMsg.time;
            } else {
                // 执行rtc消息传来的命令,让小车进行相应的动作
//                executeCommand(receiveMsg.key, receiveMsg.value);
//                System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
                addWaitingQueue(receiveMsg); // 添加等待队列,来代替直接消费,这里使用优先队列保证顺序
            }
        });
    }

    /**
     * @param receiveMsg 接收到的消息
     */
    private void addWaitingQueue(RtcMessage receiveMsg) {
        // 加个锁吧~
        lock.lock();
        waitingQueue.offer(receiveMsg);
        lock.unlock();
    }


    /**
     * 执行命令
     *
     * @param cmd 对应的指令
     * @param v   值
     */
    private void executeCommand(String cmd, int v) {
        switch (cmd) {
            case "MOVE":
                move(v);
                break;
            case "SPIN":
                spin(v);
                break;
        }
    }


    /**
     * 移动距离
     *
     * @param distance 距离
     */
    private void move(int distance) {
        System.out.println("move " + distance);
    }

    /**
     * 旋转角度
     *
     * @param angle 角度
     */
    private void spin(int angle) {
        System.out.println("spin " + angle);
    }

    public void close() {
        synchronized (waitingQueue) {
            waitingQueue.forEach(c -> System.out.print("*****************" + c.seq + ", "));
            System.out.println();
        }

    }
}
嘻嘻,解决啦~
上面通过对 小车(Car) 的改造,将【收到命令立即执行命令】改为【收到命令后先放入优先队列】,然后【对命令进行有序执行】,顺利的解决了命令乱序的问题,真的是 Awesome ~

三、消息丢失问题

按顺序看下来的小伙伴们可以从上面了解到,在处理命令乱序问题时候,我们默认消息是不丢失的;但是呢,这个消息可并不是那么乐观,可能存在丢失的情况,这可就不好了,既然消息丢失,那我们的命令很可能无法接收到,也会出现很严重的后果!

当然,这个我们也是有解滴~有小伙伴可能会讲了:这不就用上面说到的TCP的可靠传输的方案来解决不就可以了嘛?没错,可以!但是笔者在这里小小偷了个懒,使用了一种不是很好但能快速解决问题的方案:多倍发包。至于TCP的可靠传输,可以下来自己尝试实现一下~

我们看一下,什么是多倍发包。

举个栗子:快要过年了,小明想要抢票回家,但是呢,一票难求,大家都懂。所以小明在放票前,打开了10个相同的买票窗口,在到点时,快速的把每个窗口都点了一遍买票。由于小明的“多倍发包”,成功抢到了回家的票~

对于这个消息丢失,我们可以通过每次执行命令,发多个相同的指令,在小车接收端,对于相同序号的指令只执行一遍,这样就可以在即使部分指令丢了的情况下,也最大程度保证消息的完整。下面我们来实现一下吧~

首先,模拟消息丢失,在 Car 中的 startUp() 开始监听消息的方法中,加上随机丢失的代码,如下

// Car.class
    /**
     * 开启小车
     */
    public synchronized void startUp() {

        // 开始监听 uuid room 的 rtc的消息
        rtcClient.onListening(receiveMsg -> {
            // 判断如果是心跳,则更新上一次探活时间
            if (receiveMsg.key == null) {
                lastHeartbeatTime = receiveMsg.time;
            } else {
                // 执行rtc消息传来的命令,让小车进行相应的动作

                if(new Random().nextBoolean()){
                    // 模拟随机丢包
                    System.out.println("消息丢失啦【" + receiveMsg.seq + ", "+receiveMsg.key + ": " + receiveMsg.value + "】");
                    return;
                }
//                executeCommand(receiveMsg.key, receiveMsg.value);
//                System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
                addWaitingQueue(receiveMsg); // 添加等待队列,来代替直接消费,这里使用优先队列保证顺序
            }
        });
    }

我们执行一下,下图就是模拟在接收数据时,1、4两个命令都丢失了,导致程序无法正常执行命令。

丢包情况

接下来,添加多倍发包,在RtcClient中模拟,在OnListening()方法中,同样的指令发送多次,如下:

// RtcClient.class
    // 这里可以设置多倍发包的倍数
    private static final int MULTIPLE_MESSAGE_SEND_TIMES = 3;
// 监听发送来的心跳和消息,用线程来模拟
    public void onListening(RtcEvent e) {

        // 定时发送心跳的线程
        Thread t = new Thread(() -> {
            for (int i = 0; !shutdown ; i++) {
                RtcMessage heartbeat = new RtcMessage();
                heartbeat.key = null;
                e.handleMessage(heartbeat);
                System.out.println("心跳:"+ i);

                // 模拟中间接收到消息
                if (i == 10) {

                    // 正确顺序,0-MOVE、1-SPIN、2-MOVE、3-SPIN、4-MOVE
                    // 先生成顺序消息
                    List<RtcMessage> msgList = new ArrayList<>(); // 使用list装
                    for(int j = 0; j < 5; j++){
                        RtcMessage cmdMsg = new RtcMessage();
                        if ((j & 1) == 0){
                            // 执行命令
                            cmdMsg.key = "MOVE";
                            cmdMsg.value = 1000;
                        }else {
                            // 执行命令
                            cmdMsg.key = "SPIN";
                            cmdMsg.value = 90;
                        }
                        cmdMsg.seq = j;
                        msgList.add(cmdMsg);
                    }
                    // 然后开启多线程发送指令
                    msgList.forEach(msg -> {
                        new Thread(() -> {
                            // 多倍发包
                            for (int j = 0; j < MULTIPLE_MESSAGE_SEND_TIMES; j++) {
                                e.handleMessage(msg);
                            }
                        }).start();
                    });


                }

                // 模拟掉线
                if (i == 15){
                    try {
                        Thread.sleep(2000);
                        break;
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        });
        t.start();

    }

执行,虽然这次能保证客户端能接收到每个Seq至少一条消息,但是,有些Seq的消息接收了多遍,如果都执行,那么也不符合预期。如下图,丢失了 【Seq=0、0、1、1、2、3、4、4】 这八条消息,接受到了【Seq=0、1、2、2、3、3、4】这七条消息,执行结果会如下,我们只能正确执行【0、1、2】三条消息的指令,不能继续往下执行了,因为当前队列头部为2,但是我们的lastSeq已经指向3,所以我们不能继续执行2.

多倍发包

所以我们继续修改Car小车接收端,如果命令被执行过,则直接丢掉,不进入阻塞队列,这样我们就可以继续有序的执行命令。只需要对监听 waitingQueue 的方法进行处理即可,让重复的命令包丢掉。下面是对Car类下的 initMsgWaitingQueue() 方法进行改造,在判断队列内消息的开始,先判断是否已经接收过这个Seq,接受过的话直接丢掉。

Car.class
    /**
     * 初始化命令消费等待队列,开启线程,轮询等待队列,做到有顺序消费(执行命令)
     */
    private void initMsgWaitingQueue() {
        // 初始化首次消费命令顺序为0,以后每消费一次,序列号都+1,这个在tcp中可以作为ACK
        new Thread(() -> {
            for (; ; ) {
                lock.lock();
                System.out.println("wait for command...");
                if (waitingQueue.size() == 0) {
                    // 还没有消息
//                    System.out.println("null message.");
                } else {
                    RtcMessage receiveMsg = waitingQueue.peek();
                    // 如果队首的Command的Seq小于当前执行的lastSeq,那么说明这条命令已经被执行了,所以丢掉
                    if (receiveMsg.seq < lastSeq.get()){
                        waitingQueue.poll(); // 丢掉
                        continue; // 直接进行下一轮
                    }

                    // 如果本次有消息
                    // 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
                    if (receiveMsg.seq != lastSeq.get()) {
                        // 既然不相等,那就需要再减回去
                        System.out.printf("receiveMsg seq: %d, last seq: %d\n", receiveMsg.seq, lastSeq.get());
                        System.out.println("not sequence message, continue wait...");
                    } else {
                        // 若 seq = lastSeq + 1,则直接执行命令,进行消费
                        executeCommand(receiveMsg.key, receiveMsg.value);
                        lastSeq.addAndGet(1); // 标记上次消费的位置
                        System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
                        waitingQueue.poll();
                        continue;
                    }
                }
                lock.unlock();

                // 等一下消息吧
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


//                    try {
//                        RtcMessage receiveMsg = waitingQueue.poll(1, TimeUnit.SECONDS);
//                        if (receiveMsg == null) {
//                            System.out.println("null message.");
//                            continue;
//                        }
//                        waitingQueue.forEach(c -> System.out.println(c.seq));
//                        // 如果这次的消息序列号不等于上次+1,即 seq != lastSeq + 1,则还需要再把它入队列,再等等
//                        if (receiveMsg.seq != lastSeq.get() + 1) {
//                            addWaitingQueue(receiveMsg); // 再次入队
//                            Thread.sleep(1000); // 再等待一秒吧
//                            continue;
//                        }
//                        // 若 seq = lastSeq + 1,则直接执行命令
//                        executeCommand(receiveMsg.key, receiveMsg.value);
//                        lastSeq.addAndGet(1); // 标记上次消费的位置
//                        System.out.println("[ Seq: " + receiveMsg.seq + ", Cmd: " + receiveMsg.key + ", val: " + receiveMsg.value + " ]");
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
            }

        }).start();
    }

下面就是暴力发包的效果,即使丢了【1、2、2、3、3】消息,也可以看到后面按顺序执行了每一条指令,而且只执行了一次。

最终结果

以上,就是对不稳定消息服务的改造全部内容~其中许多地方写的不够严谨,例如客户端重连后,Seq会重置,比较重点不是在这里嘛,而且解决这个问题应该也不是太复杂,大家可以搞一下嘛。

其他文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注