[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9bbd544 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9bbd544 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9bbd544 Branch: refs/heads/1.1.x-branch Commit: c9bbd59eee98d0427c115ff13a6878706179 Parents: a1ffcb9 Author: Stig Rohde DøssingAuthored: Sat Jul 1 17:29:56 2017 +0200 Committer: Stig Rohde Døssing Committed: Tue Oct 17 07:22:27 2017 +0200 -- .../apache/storm/kafka/spout/KafkaSpout.java| 6 +- .../kafka/spout/internal/OffsetManager.java | 86 +++- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 6 +- .../kafka/spout/KafkaSpoutRebalanceTest.java| 14 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 9 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- .../kafka/spout/internal/OffsetManagerTest.java | 135 +++ 7 files changed, 180 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4d88205..207e9b4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -181,7 +181,7 @@ public class KafkaSpout extends BaseRichSpout { } /** - * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was committed for this TopicPartition @@ -190,8 +190,8 @@ public class KafkaSpout extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else { -// By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. -kafkaConsumer.seek(tp, committedOffset.offset() + 1); +// By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. +kafkaConsumer.seek(tp, committedOffset.offset()); } } else {// no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 1c474e3..7dfe7f6 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -37,7 +37,7 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; -// Last offset committed to Kafka. Initially it is set to fetchOffset - 1 +// Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. private long committedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); @@ -47,8 +47,8 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; -this.committedOffset = initialFetchOffset - 1; -
[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/148ee660 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/148ee660 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/148ee660 Branch: refs/heads/1.x-branch Commit: 148ee6609557d7436ce826da29ff3ec301458586 Parents: 4fc47b2 Author: Stig Rohde DøssingAuthored: Sat Jul 1 17:29:56 2017 +0200 Committer: Stig Rohde Døssing Committed: Tue Oct 17 07:20:39 2017 +0200 -- .../apache/storm/kafka/spout/KafkaSpout.java| 6 +- .../kafka/spout/internal/OffsetManager.java | 86 +++- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 6 +- .../kafka/spout/KafkaSpoutRebalanceTest.java| 4 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 8 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- .../kafka/spout/internal/OffsetManagerTest.java | 135 +++ 7 files changed, 174 insertions(+), 73 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 3582bdb..68bce11 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -202,7 +202,7 @@ public class KafkaSpout extends BaseRichSpout { } /** - * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was committed for this TopicPartition @@ -211,8 +211,8 @@ public class KafkaSpout extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else { -// By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. -kafkaConsumer.seek(tp, committedOffset.offset() + 1); +// By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. +kafkaConsumer.seek(tp, committedOffset.offset()); } } else {// no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 1c474e3..7dfe7f6 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -37,7 +37,7 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; -// Last offset committed to Kafka. Initially it is set to fetchOffset - 1 +// Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. private long committedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); @@ -47,8 +47,8 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; -this.committedOffset = initialFetchOffset - 1; -