Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2480#discussion_r158705487
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
    @@ -57,7 +58,8 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) {     
     // O(Log N)
         }
     
         public void addToEmitMsgs(long offset) {
    -        this.emittedOffsets.add(offset);                  // O(Log N)
    +        this.emittedOffsets.add(offset);  // O(Log N)
    +        this.latestEmittedOffset = offset;
    --- End diff --
    
    I don't think this works. This will just set the latest emitted offset to 
the offset that was most recently emitted. I think you want the highest offset 
that has been emitted.


---

Reply via email to