[ 
https://issues.apache.org/jira/browse/ROCKETMQ-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jaskey Lam updated ROCKETMQ-313:
--------------------------------
    Description: 
It has been reported that for a new consumer group, when user uses 
CONSUME_FROM_LAST_OFFSET , the consumer still consumes the old messages.

If a new consumer group starts, most of the case CONSUME_FROM_LAST_OFFSET will 
work as expected, the code base is in queryConsumerOffset in 
ConsumerManageProcessor.java.

But sometimes It will return the min offset, which will be confusing. This 
happens only when the topic is still quite new(minOffset==0) and no large 
accumulation (checkInDiskByConsumeOffset=false). 


{code}
        if (offset >= 0) {
            responseHeader.setOffset(offset);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {
            long minOffset =
                
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
                    requestHeader.getQueueId());
            if (minOffset <= 0
                && 
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { 
//here is why sometims consume_from_last does not work as expected
                responseHeader.setOffset(0L);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
            } else {
                response.setCode(ResponseCode.QUERY_NOT_FOUND);
                response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group 
consumer boot first");
            }
        }
{code}

Apparently this is to ensure that if a new queue is scalely created, messages 
should still be consumed in the newly created queue.

But this makes CONSUME_FROM_LAST_OFFSET  not work as expected.

Similar bug is reported in https://issues.apache.org/jira/browse/ROCKETMQ-112 
but it is closed as WON'T FIX.



  was:
It has been reported that for a new consumer group, when user uses 
CONSUME_FROM_LAST_OFFSET , the consumer still consumes the old messages.

If a new consumer group starts, most of the case CONSUME_FROM_LAST_OFFSET will 
work as expected, the code base is in queryConsumerOffset in 
ConsumerManageProcessor.java.

But sometimes It will return the min offset, which will be confusing. This 
happens only when the topic is still quite new(minOffset==0) and no large 
accumulation(checkInDiskByConsumeOffset=false). Apparently this is to ensure 
that if a new queue is scalely created, messages should still be consumed in 
the newly created queue.

But this makes CONSUME_FROM_LAST_OFFSET  not work as expected.

Similar bug is reported in https://issues.apache.org/jira/browse/ROCKETMQ-112 
but it is closed as WON'T FIX


> CONSUME_FROM_LAST_OFFSET doesn't work when queue is new
> -------------------------------------------------------
>
>                 Key: ROCKETMQ-313
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-313
>             Project: Apache RocketMQ
>          Issue Type: Bug
>          Components: rocketmq-broker, rocketmq-client
>    Affects Versions: 4.0.0-incubating, 4.1.0-incubating
>            Reporter: Jaskey Lam
>            Assignee: yukon
>
> It has been reported that for a new consumer group, when user uses 
> CONSUME_FROM_LAST_OFFSET , the consumer still consumes the old messages.
> If a new consumer group starts, most of the case CONSUME_FROM_LAST_OFFSET 
> will work as expected, the code base is in queryConsumerOffset in 
> ConsumerManageProcessor.java.
> But sometimes It will return the min offset, which will be confusing. This 
> happens only when the topic is still quite new(minOffset==0) and no large 
> accumulation (checkInDiskByConsumeOffset=false). 
> {code}
>         if (offset >= 0) {
>             responseHeader.setOffset(offset);
>             response.setCode(ResponseCode.SUCCESS);
>             response.setRemark(null);
>         } else {
>             long minOffset =
>                 
> this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
>                     requestHeader.getQueueId());
>             if (minOffset <= 0
>                 && 
> !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
>                 requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { 
> //here is why sometims consume_from_last does not work as expected
>                 responseHeader.setOffset(0L);
>                 response.setCode(ResponseCode.SUCCESS);
>                 response.setRemark(null);
>             } else {
>                 response.setCode(ResponseCode.QUERY_NOT_FOUND);
>                 response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this 
> group consumer boot first");
>             }
>         }
> {code}
> Apparently this is to ensure that if a new queue is scalely created, messages 
> should still be consumed in the newly created queue.
> But this makes CONSUME_FROM_LAST_OFFSET  not work as expected.
> Similar bug is reported in https://issues.apache.org/jira/browse/ROCKETMQ-112 
> but it is closed as WON'T FIX.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to