Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157349862
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -399,14 +429,16 @@ private void emitIfWaitingNotEmitted() {
*/
private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(),
record.partition());
- final KafkaSpoutMessageId msgId =
retryService.getMessageId(record);
+ final KafkaSpoutMessageId msgId =
retryService.getMessageId(record, context);
if (offsetManagers.containsKey(tp) &&
offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked.
Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it
is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted.
Skipping", record);
} else {
- if (kafkaConsumer.committed(tp) != null &&
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
+ final OffsetAndMetadata committedOffset =
kafkaConsumer.committed(tp);
+ if (isOffsetCommittedByThisTopology(committedOffset) &&
committedOffset.offset() > kafkaConsumer.position(tp)) {
+ // this check should pass if commit was done by another
topology such that FirstPollOffsetStrategy == EARLIEST is honored (STORM-2844)
--- End diff --
Done
---