Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2454#discussion_r156224126
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V>
record) {
} else if (emitted.contains(msgId)) { // has been emitted and it
is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted.
Skipping", record);
} else {
- if (kafkaConsumer.committed(tp) != null &&
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
+ if (kafkaConsumer.committed(tp) != null &&
(kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
--- End diff --
OK, I agree. Because initially I thought that this could be a potential
fix, but then found out that it wouldn't work. I was wondering if I had missed
anything. Thanks.
---