Github user jianbzhou commented on the pull request:
https://github.com/apache/storm/pull/1131#issuecomment-221465746
@hmcl , sorry for the late reply, i was on leave and just now i send the
updated spout to you, pls help review. Below is the major changes:
1. In poll method, change numUncommittedOffsets <
kafkaSpoutConfig.getMaxUncommittedOffsets()
to emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
2. In method doSeekRetriableTopicPartitions, seems your code is
contradicted with the comment, i changed
**from:**
else {
kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
}
**To:**
else {
// kafkaConsumer.seekToEnd(rtp); // Seek to last
committed offset
OffsetAndMetadata commitOffset =
kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, commitOffset.offset()); //
Seek to last committed offset
}
3. in ack method, we found acked.get(msgId.getTopicPartition()) might
return null so we add some defensive validation - possibly due to kafka
consumer rebalance, the partition doesn't belongs to this spout anymore
4. in OffsetEntry.add method, we add one condition, only add the message
when condition is met - if (msgId.offset() > committedOffset). This
change was also applied in method doSeekRetriableTopicPartitions.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---