Github user jianbzhou commented on the pull request:

    https://github.com/apache/storm/pull/1131#issuecomment-221465746
  
    @hmcl , sorry for the late reply, i was on leave and just now i send the 
updated spout to you, pls help review. Below is the major changes:
    1. In poll method, change numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets()
    to emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
    2. In method doSeekRetriableTopicPartitions, seems your code is 
contradicted with the comment, i changed 
    **from:**
        else {
              kafkaConsumer.seekToEnd(rtp);    // Seek to last committed offset
         }
    **To:** 
                  else {
                        //  kafkaConsumer.seekToEnd(rtp);    // Seek to last 
committed offset
                        OffsetAndMetadata commitOffset = 
kafkaConsumer.committed(rtp);
                        kafkaConsumer.seek(rtp, commitOffset.offset());  // 
Seek to last committed offset
                    }
    
    3. in ack method, we found acked.get(msgId.getTopicPartition()) might 
return null so we add some defensive validation - possibly due to kafka 
consumer rebalance, the partition doesn't belongs to this spout anymore
    4. in OffsetEntry.add method, we add one condition, only add the message 
when condition is met -             if (msgId.offset() > committedOffset). This 
change was also applied in method doSeekRetriableTopicPartitions.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to