博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ源码 — 十、 RocketMQ顺序消息
阅读量:5946 次
发布时间:2019-06-19

本文共 6585 字,大约阅读时间需要 21 分钟。

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别

发送顺序消息

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {    @Override    public MessageQueue select(List
mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);

send方法带有参数MessageQueueSelector,MessageQueueSelector是让用户自己决定消息发送到哪一个队列,如果是局部消息的话,用来决定消息与队列的对应关系。

顺序消息消费

consumer.registerMessageListener(new MessageListenerOrderly() {    AtomicLong consumeTimes = new AtomicLong(0);    @Override    public ConsumeOrderlyStatus consumeMessage(List
msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeOrderlyStatus.SUCCESS; }});

从使用上可以推断顺序消息需要从发送到消费整个过程中保证有序,所以顺序消息具体表现为

  • 发送消息是顺序的
  • broker存储消息是顺序的
  • consumer消费是顺序的

下面分别看看rmq怎么实现顺序消息

发送顺序消息

因为broker存储消息有序的前提是producer发送消息是有序的,所以这两个结合在一起说。

消息发布是有序的含义:producer发送消息应该是依次发送的,所以要求发送消息的时候保证:

  • 消息不能异步发送,同步发送的时候才能保证broker收到是有序的。
  • 每次发送选择的是同一个MessageQueue

同步发送

从刚开始的例子中发送消息的时候,会调用下面的方法

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);}

CommunicationMode.SYNC表明了producer发送消息的时候是同步发送的。同步发送表示,producer发送消息之后不会立即返回,会等待broker的response。

broker收到producer的请求之后虽然是启动线程处理的,但是在线程中将消息写入commitLog中以后会发送response给producer,producer在收到broker的response并且是处理成功之后才算是消息发送成功。

同一个MessageQueue

为了保证broker收到消息也是顺序的,所以producer只能向其中一个队列发送消息。因为只有是同一个队列才能保证消息是发往同一个broker,只有同一个broker处理发来的消息才能保证顺序。所以发送顺序消息的时候需要用户指定MessageQueue,在send调用过程中会调用下面的方法,下面的方法中回调了用户指定的select queue的方法

private SendResult sendSelectImpl(    Message msg,    MessageQueueSelector selector,    Object arg,    final CommunicationMode communicationMode,    final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    this.makeSureStateOK();    Validators.checkMessage(msg, this.defaultMQProducer);    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    if (topicPublishInfo != null && topicPublishInfo.ok()) {        MessageQueue mq = null;        try {            // 调用用户指定的select方法来选出一个queue,如果是全局顺序,用户必须保证自己选出的queue是同一个            mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);        } catch (Throwable e) {            throw new MQClientException("select message queue throwed exception.", e);        }        if (mq != null) {            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);        } else {            throw new MQClientException("select message queue return null.", null);        }    }    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

这样每次发送的消息都是同一个MessageQueue,也就是都会发送到同一个broker,这个发送消息的过程都保证了顺序,也就保证了broker存储在CommitLog中的消息也是顺序的。

顺序消费

保证了broker中物理存储的消息是顺序的,只要保证消息消费是顺序的就能保证整个过程是顺序消息了。

还是开始的例子中,顺序消费和普通消费的listener是不一样的,顺序消费需要实现的是下面这个接口

org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly

在consumer启动的时候会根据listener的类型判断应该使用哪一个service来消费

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#startif (this.getMessageListenerInner() instanceof MessageListenerOrderly) {    // 顺序消息    this.consumeOrderly = true;    this.consumeMessageService =        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {    // 非顺序消息    this.consumeOrderly = false;    this.consumeMessageService =        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}

consumer拉取消息是按照offset拉取的,所以consumer能保证拉取到consumer的消息是连续有序的,但是consumer拉取到消息后又启动了线程去处理消息,所以就不能保证先拉取到的消息先处理

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage// org.apache.rocketmq.client.consumer.PullCallback#onSuccess// 将拉取到的消息放入ProcessQueueboolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 这里的consumeMessageService就是ConsumeMessageOrderlyServiceDefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(    pullResult.getMsgFoundList(),    processQueue,    pullRequest.getMessageQueue(),    dispathToConsume);// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequestpublic void submitConsumeRequest(    final List
msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 拉取到的消息构造ConsumeRequest,然后放入线程池等待执行 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); }}

上面将请求放入线程池了,所以线程执行的顺序又不确定了,那么consumer消费就变成无序的了吗?

答案显然是不会变成无序的,因为上面有一行关键的代码

processQueue.putMessage(pullResult.getMsgFoundList());

先说一下ProcessQueue这个关键的数据结构。一个MessageQueue对应一个ProcessQueue,是一个有序队列,该队列记录一个queueId下所有从brokerpull回来的消息,如果消费成功了就会从队列中删除。ProcessQueue有序的原因是维护了一个TreeMap

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

msgTreeMap:里面维护了从broker pull回来的所有消息,TreeMap是有序的,key是Long类型的,没有指定comparator,默认是将key强转为Comparable,然后进行比较,因为key是当前消息的offset,而Long实现了Comparable接口,所以msgTreeMap里面的消息是按照offset排序的。

所以是ProcessQueue保证了拉取回来的消息是有序的,继续上面说到的启动线程执行ConsumeRequest.run方法来消费消息

Overridepublic void run() {    // 保证一个队列只有一个线程访问,因为顺序消息只有一个队列,也就保证了只有一个线程消费    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);    synchronized (objLock) {        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {            final long beginTime = System.currentTimeMillis();            for (boolean continueConsume = true; continueConsume; ) {                // 从ProcessQueue中获取消息进行消费,获取出来的消息也是有序的                final int consumeBatchSize =                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();                List
msgs = this.processQueue.takeMessags(consumeBatchSize); // 省略中间代码}

从ProcessQueue中获取出来的消息是有序的,consumer保证了消费的有序性。

总结

rmq从发送消息、broker存储消息到consumer消费消息整个过程中,环环相扣,最终保证消息是有序的。

转载于:https://www.cnblogs.com/sunshine-2015/p/9074956.html

你可能感兴趣的文章
Firefox 密码管理器 Lockbox 推出 Android 版
查看>>
视频点播-资源用量
查看>>
好程序员分享大势所趋 HTML5成Web开发者最关心的技术 ...
查看>>
北汽集团荣辉:抓不住自动驾驶 就抓不住车企的命脉 | 自动驾驶这十年 ...
查看>>
豆瓣评分8.8,这本程序员案头必备宝典,10年沉淀,新版再现 ...
查看>>
运行 Spring Boot 应用的 3 种方式!
查看>>
【内容安全】虚拟化及云环境下数据库审计优缺点分析
查看>>
crmeb电商系统
查看>>
xttprep.tmpl
查看>>
mycat垂直分库
查看>>
无需停机,手把手教您将 Docker CE 切换为 Docker EE
查看>>
Ubuntu 14.04 Web服务器,Apache的安装和配置
查看>>
MaxCompute 图计算用户手册(上)
查看>>
自带科技基因,打造纯原创IP,“燃烧小宇宙”获数千万A轮融资
查看>>
未能加载文件或程序集&quot;Newtonsoft.Json, Version=4.5.0.0
查看>>
C#多线程编程系列(二)- 线程基础
查看>>
Jenkins 内置变量(学习笔记二十四)
查看>>
PostgreSQL 10.1 手册_部分 II. SQL 语言_第 13 章 并发控制_13.2. 事务隔离
查看>>
虚拟机概念
查看>>
【云周刊】第195期:全球首家!阿里云获GNTC2018 网络创新大奖 成唯一获奖云服务商...
查看>>