[ https://issues.apache.org/jira/browse/ROCKETMQ-112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889595#comment-15889595 ]
Jaskey Lam edited comment on ROCKETMQ-112 at 3/1/17 6:37 AM: ------------------------------------------------------------- [~zhaoziyan] please reformat your comment accroing to https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all If a new consumer group is starts, most of the case CONSUME_FROM_LAST_OFFSET will work as expected, please refer to queryConsumerOffset in ConsumerManageProcessor.java. It will return the min offset only when the topic is still quite new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false), this is to ensure that if a new queue is created, messages should still be consumed in the newly created queue. But indeed, this is a bit confusing, I suggest we respect strictly for case of CONSUME_FROM_LAST_OFFSET and deal with newly created queue more precisely {code} if (offset >= 0) { responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); } } {code} was (Author: jaskey): [~zhaoziyan] please reformat your comment accroing to https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all If a new consumer group is starts, most of the time CONSUME_FROM_LAST_OFFSET will work , please refer to queryConsumerOffset in ConsumerManageProcessor.java. It will return the min offset only when the topic is still quite new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false), this is to ensure that if a new queue is created, messages should still be consumed in the newly created queue. But indeed, this is a bit confusing, I suggest we respect strictly for case of CONSUME_FROM_LAST_OFFSET and deal with newly created queue more precisely {code} if (offset >= 0) { responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); } } {code} > MQ client CONSUME_FROM_LAST_OFFSET dont work > -------------------------------------------- > > Key: ROCKETMQ-112 > URL: https://issues.apache.org/jira/browse/ROCKETMQ-112 > Project: Apache RocketMQ > Issue Type: Bug > Components: rocketmq-client > Reporter: zhaoziyan > Assignee: Xiaorui Wang > > case CONSUME_FROM_LAST_OFFSET: { > long lastOffset = offsetStore.readOffset(mq, > ReadOffsetType.READ_FROM_STORE); > if (lastOffset >= 0) { > result = lastOffset; > } > // First start,no offset > else if (-1 == lastOffset) { > if > (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { > result = 0L; > } > else { > try { > result = > this.mQClientFactory.getMQAdminImpl().maxOffset(mq); > } > catch (MQClientException e) { > result = -1; > } > } > } > else { > result = -1; > } > break; > } > offsetStore.readOffset is minOffset not the maxOffset > CONSUME_FROM_LAST_OFFSET dont work -- This message was sent by Atlassian JIRA (v6.3.15#6346)