Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2454#discussion_r156220249
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
             } else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
             } else {
    -            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    +            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
    --- End diff --
    
    Is this change to address https://issues.apache.org/jira/browse/STORM-2844 ?


---

Reply via email to