Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2480#discussion_r158705487
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -57,7 +58,8 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) {
// O(Log N)
}
public void addToEmitMsgs(long offset) {
- this.emittedOffsets.add(offset); // O(Log N)
+ this.emittedOffsets.add(offset); // O(Log N)
+ this.latestEmittedOffset = offset;
--- End diff --
I don't think this works. This will just set the latest emitted offset to
the offset that was most recently emitted. I think you want the highest offset
that has been emitted.
---