[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API

2017-10-16 Thread srdo
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead 
of last committed offset for compatibility with commitSync consumer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9bbd544
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9bbd544
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9bbd544

Branch: refs/heads/1.1.x-branch
Commit: c9bbd59eee98d0427c115ff13a6878706179
Parents: a1ffcb9
Author: Stig Rohde Døssing 
Authored: Sat Jul 1 17:29:56 2017 +0200
Committer: Stig Rohde Døssing 
Committed: Tue Oct 17 07:22:27 2017 +0200

--
 .../apache/storm/kafka/spout/KafkaSpout.java|   6 +-
 .../kafka/spout/internal/OffsetManager.java |  86 +++-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   6 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java|  14 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   9 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../kafka/spout/internal/OffsetManagerTest.java | 135 +++
 7 files changed, 180 insertions(+), 78 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4d88205..207e9b4 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -181,7 +181,7 @@ public class KafkaSpout extends BaseRichSpout {
 }
 
 /**
- * sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset
+ * Sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset.
  */
 private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
 if (committedOffset != null) { // offset was committed 
for this TopicPartition
@@ -190,8 +190,8 @@ public class KafkaSpout extends BaseRichSpout {
 } else if (firstPollOffsetStrategy.equals(LATEST)) {
 kafkaConsumer.seekToEnd(Collections.singleton(tp));
 } else {
-// By default polling starts at the last committed offset. 
+1 to point fetch to the first uncommitted offset.
-kafkaConsumer.seek(tp, committedOffset.offset() + 1);
+// By default polling starts at the last committed offset, 
i.e. the first offset that was not marked as processed.
+kafkaConsumer.seek(tp, committedOffset.offset());
 }
 } else {// no commits have ever been done, so start at the 
beginning or end depending on the strategy
 if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 1c474e3..7dfe7f6 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -37,7 +37,7 @@ public class OffsetManager {
 /* First offset to be fetched. It is either set to the beginning, end, or 
to the first uncommitted offset.
 * Initial value depends on offset strategy. See 
KafkaSpoutConsumerRebalanceListener */
 private final long initialFetchOffset;
-// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+// Committed offset, i.e. the offset where processing will resume if the 
spout restarts. Initially it is set to fetchOffset.
 private long committedOffset;
 // Emitted Offsets List
 private final NavigableSet emittedOffsets = new TreeSet<>();
@@ -47,8 +47,8 @@ public class OffsetManager {
 public OffsetManager(TopicPartition tp, long initialFetchOffset) {
 this.tp = tp;
 this.initialFetchOffset = initialFetchOffset;
-this.committedOffset = initialFetchOffset - 1;
-

[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API

2017-10-16 Thread srdo
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead 
of last committed offset for compatibility with commitSync consumer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/148ee660
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/148ee660
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/148ee660

Branch: refs/heads/1.x-branch
Commit: 148ee6609557d7436ce826da29ff3ec301458586
Parents: 4fc47b2
Author: Stig Rohde Døssing 
Authored: Sat Jul 1 17:29:56 2017 +0200
Committer: Stig Rohde Døssing 
Committed: Tue Oct 17 07:20:39 2017 +0200

--
 .../apache/storm/kafka/spout/KafkaSpout.java|   6 +-
 .../kafka/spout/internal/OffsetManager.java |  86 +++-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   6 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java|   4 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   8 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../kafka/spout/internal/OffsetManagerTest.java | 135 +++
 7 files changed, 174 insertions(+), 73 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 3582bdb..68bce11 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -202,7 +202,7 @@ public class KafkaSpout extends BaseRichSpout {
 }
 
 /**
- * sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset
+ * Sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset.
  */
 private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
 if (committedOffset != null) { // offset was committed 
for this TopicPartition
@@ -211,8 +211,8 @@ public class KafkaSpout extends BaseRichSpout {
 } else if (firstPollOffsetStrategy.equals(LATEST)) {
 kafkaConsumer.seekToEnd(Collections.singleton(tp));
 } else {
-// By default polling starts at the last committed offset. 
+1 to point fetch to the first uncommitted offset.
-kafkaConsumer.seek(tp, committedOffset.offset() + 1);
+// By default polling starts at the last committed offset, 
i.e. the first offset that was not marked as processed.
+kafkaConsumer.seek(tp, committedOffset.offset());
 }
 } else {// no commits have ever been done, so start at the 
beginning or end depending on the strategy
 if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 1c474e3..7dfe7f6 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -37,7 +37,7 @@ public class OffsetManager {
 /* First offset to be fetched. It is either set to the beginning, end, or 
to the first uncommitted offset.
 * Initial value depends on offset strategy. See 
KafkaSpoutConsumerRebalanceListener */
 private final long initialFetchOffset;
-// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+// Committed offset, i.e. the offset where processing will resume if the 
spout restarts. Initially it is set to fetchOffset.
 private long committedOffset;
 // Emitted Offsets List
 private final NavigableSet emittedOffsets = new TreeSet<>();
@@ -47,8 +47,8 @@ public class OffsetManager {
 public OffsetManager(TopicPartition tp, long initialFetchOffset) {
 this.tp = tp;
 this.initialFetchOffset = initialFetchOffset;
-this.committedOffset = initialFetchOffset - 1;
-