Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch cb331d454 -> c9bbd5444


[STORM-2607] Offset consumer + 1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1ffcb93
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1ffcb93
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1ffcb93

Branch: refs/heads/1.1.x-branch
Commit: a1ffcb936336d71bdf3693108e33e92c8d2a88ef
Parents: cb331d4
Author: Rodolfo Ribeiro <rodolfo.ribe...@b2wdigital.com>
Authored: Thu Jun 29 12:06:20 2017 -0300
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Mon Oct 16 20:12:33 2017 +0200

----------------------------------------------------------------------
 .../kafka/spout/internal/OffsetManager.java     | 20 ++++++++++++--------
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  2 +-
 2 files changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index b6d36d8..1c474e3 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -71,16 +71,18 @@ public class OffsetManager {
         boolean found = false;
         long currOffset;
         long nextCommitOffset = committedOffset;
+        long lastOffMessageOffset = committedOffset;
         KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience 
variable to make it faster to create OffsetAndMetadata
 
         for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
             currOffset = currAckedMsg.offset();
-            if (currOffset == nextCommitOffset + 1) {            // found the 
next offset to commit
+            if (currOffset == lastOffMessageOffset + 1) {            // found 
the next offset to commit
                 found = true;
                 nextCommitMsg = currAckedMsg;
-                nextCommitOffset = currOffset;
-            } else if (currOffset > nextCommitOffset + 1) {
-                if (emittedOffsets.contains(nextCommitOffset + 1)) {
+                lastOffMessageOffset = currOffset;
+                nextCommitOffset = lastOffMessageOffset + 1;
+            } else if (currOffset > lastOffMessageOffset + 1) {
+                if (emittedOffsets.contains(lastOffMessageOffset + 1)) {
                     LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
                     break;
                 } else {
@@ -92,11 +94,12 @@ public class OffsetManager {
                         first element after committedOffset in the ascending 
ordered emitted set.
                      */
                     LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset + 1);
+                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(lastOffMessageOffset);
                     if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
                         found = true;
                         nextCommitMsg = currAckedMsg;
-                        nextCommitOffset = currOffset;
+                        lastOffMessageOffset = currOffset;
+                        nextCommitOffset = lastOffMessageOffset + 1;
                     } else {
                         LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, 
nextEmittedOffset);
                         break;
@@ -112,7 +115,8 @@ public class OffsetManager {
         OffsetAndMetadata nextCommitOffsetAndMetadata = null;
         if (found) {
             nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-            LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+            LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed",
+                tp, earliestUncommittedOffset, 
nextCommitOffsetAndMetadata.offset() - 1);
         } else {
             LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
         }
@@ -131,7 +135,7 @@ public class OffsetManager {
      */
     public long commit(OffsetAndMetadata committedOffset) {
         final long preCommitCommittedOffsets = this.committedOffset;
-        long numCommittedOffsets = 0;
+        final long numCommittedOffsets = committedOffset.offset() - 
this.committedOffset - 1;
         this.committedOffset = committedOffset.offset();
         for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); 
iterator.hasNext();) {
             if (iterator.next().offset() <= committedOffset.offset()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index fff6902..5141a8f 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -129,7 +129,7 @@ public class SingleTopicKafkaSpoutTest {
         Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
         assertThat("Expected commits for only one topic partition", 
commits.entrySet().size(), is(1));
         OffsetAndMetadata offset = 
commits.entrySet().iterator().next().getValue();
-        assertThat("Expected committed offset to cover all emitted messages", 
offset.offset(), is(messageCount - 1));
+        assertThat("Expected committed offset to cover all emitted messages", 
offset.offset(), is(messageCount));
     }
 
     @Test

Reply via email to