Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2454#discussion_r156222931
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V>
record) {
} else if (emitted.contains(msgId)) { // has been emitted and it
is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted.
Skipping", record);
} else {
- if (kafkaConsumer.committed(tp) != null &&
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
+ if (kafkaConsumer.committed(tp) != null &&
(kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
--- End diff --
No, it's just to get a test to pass that happened to hit this case. 2844 is
still an issue.
---