Github user jianbzhou commented on the pull request:

    https://github.com/apache/storm/pull/1131#issuecomment-217905885
  
    thanks Hmcl.
    
    Just found below log constantly show up, seems it constantly try to commit 
one offset which is actually committed to kafka already – it might be caused 
by group rebalance – so a smaller offset (smaller than the committed offset) 
is acked back lately.
    
    For example(it is our assumption, kindly correct me if wrong): one consumer 
commit offset 1000, polled 1001~1050 messages and emitted, also message was 
acked for 1001 ~ 1009, then a rebalance happened, another consumer poll message 
from 1000 to 1025, and commit the offset to 1010, then the message 1010(was 
emitted before the rebalance) was acked back. This will cause 1010 will never 
be committed as per the logic in findNextCommitOffset method – because this 
offset was already commited to kafka successfully.
    
    Log is:
    2016-05-09 03:02:14 io.ebay.rheos.KafkaSpout [INFO] Unexpected offset found 
[37137]. OffsetEntry{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, 
fetchOffset=37138, committedOffset=37137, 
ackedMsgs={topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, 
numFails=0}|{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, 
numFails=0}} 
    
    We applied below fix - For OffsetEntry.add(KafkaSpoutMessageId msgId) 
method, we changed the code as per below – only add acked message when its 
offset is bigger than the committed offset.
    
    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
        **_if(msgId.offset() > committedOffset)//this line is newly added_**
            ackedMsgs.add(msgId);
    }
    
    Could you please help take a look at the above and let me know your 
thoughts? Thanks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to