关于CONSUME_FROM_LAST_OFFSET在新消费组中不起作用的问题,我有个文章详细说明了。
https://mp.weixin.qq.com/s/XjNjE_Hwg9WLPiInSuPMYw 文章中有图片。 以前碰到过一个问题,一个新消费组上线时和我们期望的消费行为有偏差,今天专门研究下这个问题,看看是怎么回事。 在RocketMQ中,假如一个新消费组订阅了几个topic,按正常人或者正常业务的期望,新消费组应该从订阅topic的最后一个消息开始消费,但是实际情形不是如此,有时候新消费组会从这些topic的开头开始消费。这就是新消费组上线的风险点。 这对业务来说就有风险,因为消费端需要关心要不要处理以前的消息。或者以前的消息非常多,都还没有删除,消费端要处理多久才能处理完成。再或者,我的消费者处理不了以前的那些老的消息,处理时都出错,这该怎么办? 下面我们来分析下这个问题的来龙去脉,按照1,2,3,4...来说明前因后果及解决方案。 (一)消费端配置。消费端集群消费时,消费端的默认配置是从topic的最后offset开始消费。具体配置代码在DefaultMQPullConsumerImpl的consumeFromWhere()中: 而CONSUME_FROM_LAST_OFFSET的含义是“一个新的订阅组第一次启动从队列的最后位置开始消费”,RocketMQ 3.2.6版本的代码注释中清晰的说明了,但是实际表现却不是如此。 (二),消费端拉取消息的位置计算逻辑。消费端拉取消息是按照topic下的queue来进行主动拉取的,最关键的是这个拉取的位置offset是怎么计算出来的。消费端拉取消息位置的触发点在RebalanceImpl的updateProcessQueueTableInRebalance()中,如图: 对于集群消费者,真正实现计算拉取消息位置的逻辑是在RebalancePushImpl的computePullFromWhere()方法中,这个方法在消费客户端逻辑中是非常重要的,经常出现问题都是这段逻辑导致的。虽然这段代码没有bug,但是要深刻理解这段代码才能避免各种问题的出现。以后的文章中,会经常提到这块的实现逻辑。 (三)computePullFromWhere()的集群实现。computePullFromWhere()是计算不同配置时的拉取offset,我们只关心CONSUME_FROM_LAST_OFFSET时的实现。如下图箭头处,首先去broker端拉取某个queue的消费进度信息: offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) 这段实际上执行的是RemoteBrokerOffsetStore的readOffset(), 如下图红框中的逻辑: 其中long brokerOffset = this.fetchConsumeOffsetFromBroker(mq)的实现如下图,主要是通过MQClientAPIImpl的queryConsumerOffset()去broker上拿某个queue的消费进度,发送的是requestCode是QUERY_CONSUMER_OFFSET的请求: (四),broker计算consumerOffser的逻辑。broker收到上面消费客户端的请求,是在ClientManageProcessor的queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)中做处理的: 当一个新消费组上线时,会走到订阅组不存在的情况,然后计算当前queue的minOffset,而getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(),requestHeader.getQueueId(), 0) 一般都是返回false,这个方法含义是检查当前queue的第一条消息是否在磁盘中,而不在内存中。 所以这个api的结果是,当minOffset=0时,返回offset=0,当minOffset>0时返回QUERY_NOT_FOUND结果,消费端拿到这个结果时会抛出MQBrokerException异常。 broker源代码的注释中也写的很清楚了,“ 订阅组不存在情况下,如果这个队列的消息最小Offset是0,则表示这个Topic上线时间不长,服务器堆积的数据也不多,那么这个订阅组就从0开始消费。尤其对于Topic队列数动态扩容时,必须要从0开始消费。 ” (五),消费客户端处理broker返回的consumerOffset,查清问题根源。RemoteBrokerOffsetStore的readOffset()中,当minOffset为0时,这个方法返回0;当minOffset大于0时,这个方法返回-1。中间的分析过程省略了,不然贴出来的代码会更多。如下图实现: 上述方法返回的值赋值给lastOffset ,继续往上回到RebalancePushImpl的computePullFromWhere()中,当lastOffset = 0时,会返回0;当lastOffset = -1时,将会返回queue的maxOffset。 这段逻辑对新消费组的意义就是:如果订阅的queue不是从0开始的(minOffset大于0,已经删除过数据了),那么消费端将从maxOffset开始消费,即从最新位置开始消费;如果订阅的queue是从0开始的(minOffset等于0,没有删除过数据),那么消费端将从0开始消费这个queue。 这就是风险点,和我们最初的期望,每次都从最后位置消费消息有偏差!!! (六)RocketMQ为什么这么设计?设计是否合理? 我们先去查看下Apache RocketMQ 4.2.0中broker端的实现,因为主要实现点还是在broker端。虽然计算消费位置offset的逻辑已经挪到了这个类中: org.apache.rocketmq.broker.processor.ConsumerManageProcessor,但是实现逻辑是没有任何改变的。 既然Apache版本都没有做任何改动,说明这不是个bug,就是这么设计的。下面我们来分析下为什么一个queue的minOffset为0时,消费端要从0开始消费这个queue上消息,只有这种情况超出了正常的预期。 我们做个假设,假设新消费组上线时,都是从queue的maxOffset开始消费消息。又如果一个topic在一个broker上面有4个queue,新消费组上线后,开始从这四个queue的最后位置消费消息,这时我突然扩容这个topic到8个queue,那么消费端去namesrv上拿到这8个queue的信息需要一个心跳周期,按默认配置是30秒左右。这个心跳周期内,新扩展的queue上完全可能有新消息进来。 当消费端拿到4个新扩展queue的信息后,去broker端拉取消息时,broker还是把这4个扩容queue当作新queue来处理的。按照我们的假设,最终消费端会从这4个新queue的maxOffset开始消费。这就有可能丢失了这4个扩容queue的前面一些消息,有可能会很多消息,而这些消息完全是在新消费组上线后发送出来的!! 有消息漏消费了!这就是为什么新消费组不能都是从maxOffset开始消费的。 这样原因就清楚了,RocketMQ的设计是合理的,导致了重复消费是不可避免的,但是风险是巨大的。这也体现了RocketMQ的一个重大设计原则:宁可重复消费无数消息,也绝不漏掉一条消息。就跟国民党当年喊得 “宁可错杀千人,也绝不放过一个” 一样。 (七),解决方案。我订阅的topic上面可能有几十万条消息没有删除过,难道新上线消费组时,几十万条消息要重新消费吗? 目前我想到的有两种解决方案,1.可以使用mqadmin中的resetOffsetByTime命令来跳过某一个时间点之前的消息。 2.新消费组的消费者启动时,自己去过滤老的消息或者根据时间忽略以前的消息。当然肯定还有其他的解决方案,诸位看官可以思考下。 (八)最终总结下。新消费组上线时还是要处理好历史消息的,无论怎样处理,要提前做好准备。有可能消费到大量历史消息,这是RocketMQ的本身机制导致的,它的配置有更深层的含义。