Hi, 非常欢迎撰写RocketMQ相关的技术文章,建议放到Google DOC上并共享出来方便大家进行Comment。
Regards, yukon 2018-04-02 12:09 GMT+08:00 404828407 <404828...@qq.com>: > RocketMQ中新消费组上线时有什么风险? > 作者:卢松 > > > > > 以前碰到过一个问题,一个新消费组上线时和我们期望的消费行为有偏差,今天专门研究下这个问题,看看是怎么回事。 > > > 在RocketMQ中,假如一个新消费组订阅了几个topic,按正常人或者正常业务的期望,新消费组应该从订阅topic的最后一个消息开始消费, > *但是实际情形不是如此,有时候新消费组会从这些topic的开头开始消费。这就是新消费组上线的风险点。* > > > 这对业务来说就有风险,因为消费端需要关心要不要处理以前的消息。或者以前的消息非常多,都还没有删除,消费端要处理多久才能处理完成。再或者, > 我的消费者处理不了以前的那些老的消息,处理时都出错,这该怎么办? > > > 下面我们来分析下这个问题的来龙去脉,按照1,2,3,4...来说明前因后果及解决方案。 > > > *(一)消费端配置。*消费端集群消费时,消费端的默认配置是从topic的最后offset开始消费。 > 具体配置代码在DefaultMQPullConsumerImpl的consumeFromWhere()中: > > > > 而CONSUME_FROM_LAST_OFFSET的含义是“一个新的订阅组第一次启动从队列的最后位置开始消费”,RocketMQ > 3.2.6版本的代码注释中清晰的说明了,但是实际表现却不是如此。 > > > > *(二),消费端拉取消息的位置计算逻辑。*消费端拉取消息是按照topic下的queue来进行主动拉取的, > 最关键的是这个拉取的位置offset是怎么计算出来的。消费端拉取消息位置的触发点在RebalanceImpl的up > dateProcessQueueTableInRebalance()中,如图: > > > > 对于集群消费者,真正实现计算拉取消息位置的逻辑是在 > *RebalancePushImpl的computePullFromWhere()方法中,这个方法在消费客户端逻辑中是非常重要的*, > 经常出现问题都是这段逻辑导致的。虽然这段代码没有bug,但是要深刻理解这段代码才能避免各种问题的出现。以后的文章中,会经常提到这块的实现逻辑。 > > > > *(三)computePullFromWhere()的集群实现*。computePullFromWhere()是计算不同配置时的拉取offset, > 我们只关心CONSUME_FROM_LAST_OFFSET时的实现。如下图箭头处,首先去broker端拉取某个queue的消费进度信息: > > > > offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) > 这段实际上执行的是RemoteBrokerOffsetStore的readOffset(), 如下图红框中的逻辑: > > > > 其中long brokerOffset = this.fetchConsumeOffsetFromBroker(mq)的实现如下图, > 主要是通过MQClientAPIImpl的queryConsumerOffset()去broker上拿某个queue的消费进度, > 发送的是requestCode是QUERY_CONSUMER_OFFSET的请求: > > > > *(四),broker计算consumerOffser的逻辑。*broker收到上面消费客户端的请求, > 是在ClientManageProcessor的queryConsumerOffset(ChannelHandlerContext ctx, > RemotingCommand request)中做处理的: > > > > 当一个新消费组上线时,会走到订阅组不存在的情况,然后计算当前queue的minOffset,而getMessageStore(). > checkInDiskByConsumeOffset( > > requestHeader.getTopic(),requestHeader.getQueueId(), 0) > > 一般都是返回false,这个方法含义是检查当前queue的第一条消息是否在磁盘中,而不在内存中。 > > > > *所以这个api的结果是,当minOffset=0时,返回offset=0,当minOffset>0时返回QUERY_NOT_FOUND结果,消费端拿到这个结果时会抛出MQBrokerException异常。* > > > broker源代码的注释中也写的很清楚了,“ > > > *订阅组不存在情况下,如果这个队列的消息最小Offset是0,则表示这个Topic上线时间不长,服务器堆积的数据也不多,那么这个订阅组就从0开始消费。尤其对于Topic队列数动态扩容时,必须要从0开始消费。* > > ” > > *(五),消费客户端处理broker返回的consumerOffset,查清问题根源。*RemoteBrokerOffsetStor > e的readOffset()中,*当minOffset为0时,这个方法返回0;当minOffset大于0时,这个方法返回-1* > 。中间的分析过程省略了,不然贴出来的代码会更多。如下图实现: > > > > 上述方法返回的值赋值给*lastOffset ,*继续往上回到 > *RebalancePushImpl的computePullFromWhere()中,*当*lastOffset = > 0时,会返回0;当lastOffset = -1时,将会返回queue的maxOffset。* > > > > *这段逻辑对新消费组的意义就是:* > *如果订阅的queue不是从0开始的(minOffset大于0,已经删除过数据了),那么消费端将从maxOffset开始消费,即从最新位置开始消费;如果订阅的queue是从0开始的(minOffset等于0,没有删除过数据),那么消费端将从0开始消费这个queue。* > > > *这就是风险点,和我们最初的期望,每次都从最后位置消费消息有偏差!!!* > > > *(六)RocketMQ为什么这么设计?设计是否合理?* > > > 我们先去查看下Apache RocketMQ 4.2.0中broker端的实现,因为主要实现点还是在broker端。 > 虽然计算消费位置offset的逻辑已经挪到了这个类中: > > > *org.apache.rocketmq.broker.processor.ConsumerManageProcessor,但是实现逻辑是没有任何改变的。* > > > 既然Apache版本都没有做任何改动,说明这不是个bug,就是这么设计的。下面我们来分析下 > *为什么一个queue的minOffset为0时,消费端要从0开始消费这个queue上消息,只有这种情况超出了正常的预期。* > > > 我们做个假设,假设新消费组上线时,都是从queue的maxOffset开始消费消息。又如果一个topic在一个broker上面有4个queue, > 新消费组上线后,开始从这四个queue的最后位置消费消息,这时我突然扩容这个topic到8个queue, > 那么消费端去namesrv上拿到这8个queue的信息需要一个心跳周期,按默认配置是30秒左右。这个心跳周期内, > 新扩展的queue上完全可能有新消息进来。 > > > 当消费端拿到4个新扩展queue的信息后,去broker端拉取消息时, > *broker还是把这4个扩容queue当作新queue来处理的。按照我们的假设,最终消费端会从这4个新queue的maxOffset开始消费。* > *这就有可能丢失了这4个扩容queue的前面一些消息,有可能会很多消息,而这些消息完全是在新消费组上线后发送出来的!!* > > > *有消息漏消费了!这就是为什么新消费组不能都是从maxOffset开始消费的。* > > > 这样原因就清楚了,RocketMQ的设计是合理的,导致了重复消费是不可避免的,但是风险是巨大的。*这也体现了RocketMQ的一个重大设计原则:* > *宁可重复消费无数消息,也绝不漏掉一条消息。*就跟国民党当年喊得 “宁可错杀千人,也绝不放过一个” 一样。 > > > *(七),解决方案*。我订阅的topic上面可能有几十万条消息没有删除过,难道新上线消费组时,几十万条消息要重新消费吗? > > > 目前我想到的有两种解决方案,1.可以使用mqadmin中的resetOffsetByTime命令来跳过某一个时间点之前的消息。 > 2.新消费组的消费者启动时,自己去过滤老的消息或者根据时间忽略以前的消息。当然肯定还有其他的解决方案,诸位看官可以思考下。 > > > *(八)最终总结下。*新消费组上线时还是要处理好历史消息的,无论怎样处理,要提前做好准备。有可能消费到大量历史消息, > 这是RocketMQ的本身机制导致的,它的配置有更深层的含义。 > > > 本篇结束,请听下回分解。 >