Github user jianbzhou commented on the pull request: https://github.com/apache/storm/pull/1131#issuecomment-217905885 thanks Hmcl. Just found below log constantly show up, seems it constantly try to commit one offset which is actually committed to kafka already â it might be caused by group rebalance â so a smaller offset (smaller than the committed offset) is acked back lately. For example(it is our assumption, kindly correct me if wrong): one consumer commit offset 1000, polled 1001~1050 messages and emitted, also message was acked for 1001 ~ 1009, then a rebalance happened, another consumer poll message from 1000 to 1025, and commit the offset to 1010, then the message 1010(was emitted before the rebalance) was acked back. This will cause 1010 will never be committed as per the logic in findNextCommitOffset method â because this offset was already commited to kafka successfully. Log is: 2016-05-09 03:02:14 io.ebay.rheos.KafkaSpout [INFO] Unexpected offset found [37137]. OffsetEntry{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, fetchOffset=37138, committedOffset=37137, ackedMsgs={topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, numFails=0}|{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, numFails=0}} We applied below fix - For OffsetEntry.add(KafkaSpoutMessageId msgId) method, we changed the code as per below â only add acked message when its offset is bigger than the committed offset. public void add(KafkaSpoutMessageId msgId) { // O(Log N) **_if(msgId.offset() > committedOffset)//this line is newly added_** ackedMsgs.add(msgId); } Could you please help take a look at the above and let me know your thoughts? Thanks.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---