STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead 
of last committed offset for compatibility with commitSync consumer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9bbd544
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9bbd544
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9bbd544

Branch: refs/heads/1.1.x-branch
Commit: c9bbd544449eee98d0427c115ff13a6878706179
Parents: a1ffcb9
Author: Stig Rohde Døssing <stigdoess...@gmail.com>
Authored: Sat Jul 1 17:29:56 2017 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Tue Oct 17 07:22:27 2017 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |   6 +-
 .../kafka/spout/internal/OffsetManager.java     |  86 +++++++-----
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   6 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  14 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   9 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../kafka/spout/internal/OffsetManagerTest.java | 135 +++++++++++++++----
 7 files changed, 180 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4d88205..207e9b4 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -181,7 +181,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
 
         /**
-         * sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset
+         * Sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset.
          */
         private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
             if (committedOffset != null) {             // offset was committed 
for this TopicPartition
@@ -190,8 +190,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 } else if (firstPollOffsetStrategy.equals(LATEST)) {
                     kafkaConsumer.seekToEnd(Collections.singleton(tp));
                 } else {
-                    // By default polling starts at the last committed offset. 
+1 to point fetch to the first uncommitted offset.
-                    kafkaConsumer.seek(tp, committedOffset.offset() + 1);
+                    // By default polling starts at the last committed offset, 
i.e. the first offset that was not marked as processed.
+                    kafkaConsumer.seek(tp, committedOffset.offset());
                 }
             } else {    // no commits have ever been done, so start at the 
beginning or end depending on the strategy
                 if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 1c474e3..7dfe7f6 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -37,7 +37,7 @@ public class OffsetManager {
     /* First offset to be fetched. It is either set to the beginning, end, or 
to the first uncommitted offset.
     * Initial value depends on offset strategy. See 
KafkaSpoutConsumerRebalanceListener */
     private final long initialFetchOffset;
-    // Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+    // Committed offset, i.e. the offset where processing will resume if the 
spout restarts. Initially it is set to fetchOffset.
     private long committedOffset;
     // Emitted Offsets List
     private final NavigableSet<Long> emittedOffsets = new TreeSet<>();
@@ -47,8 +47,8 @@ public class OffsetManager {
     public OffsetManager(TopicPartition tp, long initialFetchOffset) {
         this.tp = tp;
         this.initialFetchOffset = initialFetchOffset;
-        this.committedOffset = initialFetchOffset - 1;
-        LOG.debug("Instantiated {}", this);
+        this.committedOffset = initialFetchOffset;
+        LOG.debug("Instantiated {}", this.toString());
     }
 
     public void addToAckMsgs(KafkaSpoutMessageId msgId) {          // O(Log N)
@@ -60,9 +60,11 @@ public class OffsetManager {
     }
 
     /**
-     * An offset is only committed when all records with lower offset have been
+     * An offset can only be committed when all emitted records with lower 
offset have been
      * acked. This guarantees that all offsets smaller than the committedOffset
-     * have been delivered.
+     * have been delivered, or that those offsets no longer exist in Kafka. 
+     * <p/>
+     * The returned offset points to the earliest uncommitted offset, and 
matches the semantics of the KafkaConsumer.commitSync API.
      *
      * @return the next OffsetAndMetadata to commit, or null if no offset is
      * ready to commit.
@@ -71,74 +73,78 @@ public class OffsetManager {
         boolean found = false;
         long currOffset;
         long nextCommitOffset = committedOffset;
-        long lastOffMessageOffset = committedOffset;
         KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience 
variable to make it faster to create OffsetAndMetadata
 
         for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
             currOffset = currAckedMsg.offset();
-            if (currOffset == lastOffMessageOffset + 1) {            // found 
the next offset to commit
+            if (currOffset == nextCommitOffset) {            // found the next 
offset to commit
                 found = true;
                 nextCommitMsg = currAckedMsg;
-                lastOffMessageOffset = currOffset;
-                nextCommitOffset = lastOffMessageOffset + 1;
-            } else if (currOffset > lastOffMessageOffset + 1) {
-                if (emittedOffsets.contains(lastOffMessageOffset + 1)) {
-                    LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
+                nextCommitOffset = currOffset + 1;
+            } else if (currOffset > nextCommitOffset) {
+                if (emittedOffsets.contains(nextCommitOffset)) {
+                    LOG.debug("topic-partition [{}] has non-sequential offset 
[{}]."
+                        + " It will be processed in a subsequent batch.", tp, 
currOffset);
                     break;
                 } else {
                     /*
-                        This case will arise in case of non contiguous offset 
being processed.
-                        So, if the topic doesn't contain offset = 
committedOffset + 1 (possible
+                        This case will arise in case of non-sequential offset 
being processed.
+                        So, if the topic doesn't contain offset = 
nextCommitOffset (possible
                         if the topic is compacted or deleted), the consumer 
should jump to
                         the next logical point in the topic. Next logical 
offset should be the
-                        first element after committedOffset in the ascending 
ordered emitted set.
+                        first element after nextCommitOffset in the ascending 
ordered emitted set.
                      */
-                    LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(lastOffMessageOffset);
+                    LOG.debug("Processed non-sequential offset."
+                        + " The earliest uncommitted offset is no longer part 
of the topic."
+                        + " Missing offset: [{}], Processed: [{}]", 
nextCommitOffset, currOffset);
+                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset);
                     if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
-                        found = true;
+                        LOG.debug("Found committable offset: [{}] after 
missing offset: [{}], skipping to the committable offset",
+                            currOffset, nextCommitOffset);
                         nextCommitMsg = currAckedMsg;
-                        lastOffMessageOffset = currOffset;
-                        nextCommitOffset = lastOffMessageOffset + 1;
+                        nextCommitOffset = currOffset + 1;
                     } else {
-                        LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, 
nextEmittedOffset);
+                        LOG.debug("Topic-partition [{}] has non-sequential 
offset [{}]."
+                            + " Next offset to commit should be [{}]", tp, 
currOffset, nextCommitOffset);
                         break;
                     }
                 }
             } else {
-                throw new IllegalStateException("The offset [" + currOffset + 
"] is below the current committed "
-                    + "offset [" + committedOffset + "] for [" + tp + "]."
+                throw new IllegalStateException("The offset [" + currOffset + 
"] is below the current nextCommitOffset "
+                    + "[" + nextCommitOffset + "] for [" + tp + "]."
                     + " This should not be possible, and likely indicates a 
bug in the spout's acking or emit logic.");
             }
         }
 
         OffsetAndMetadata nextCommitOffsetAndMetadata = null;
         if (found) {
-            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-            LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed",
-                tp, earliestUncommittedOffset, 
nextCommitOffsetAndMetadata.offset() - 1);
+            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
+                nextCommitMsg.getMetadata(Thread.currentThread()));
+            LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be 
committed."
+                + " Processing will resume at offset [{}] if the spout 
restarts",
+                tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, 
nextCommitOffsetAndMetadata.offset());
         } else {
-            LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
+            LOG.debug("Topic-partition [{}] has no offsets ready to be 
committed", tp);
         }
         LOG.trace("{}", this);
         return nextCommitOffsetAndMetadata;
     }
 
     /**
-     * Marks an offset has committed. This method has side effects - it sets 
the
+     * Marks an offset as committed. This method has side effects - it sets the
      * internal state in such a way that future calls to
-     * {@link #findNextCommitOffset()} will return offsets greater than the
+     * {@link #findNextCommitOffset()} will return offsets greater than or 
equal to the
      * offset specified, if any.
      *
-     * @param committedOffset offset to be marked as committed
+     * @param committedOffset The committed offset. All lower offsets are 
expected to have been committed.
      * @return Number of offsets committed in this commit
      */
     public long commit(OffsetAndMetadata committedOffset) {
-        final long preCommitCommittedOffsets = this.committedOffset;
-        final long numCommittedOffsets = committedOffset.offset() - 
this.committedOffset - 1;
+        final long preCommitCommittedOffset = this.committedOffset;
+        long numCommittedOffsets = 0;
         this.committedOffset = committedOffset.offset();
         for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); 
iterator.hasNext();) {
-            if (iterator.next().offset() <= committedOffset.offset()) {
+            if (iterator.next().offset() < committedOffset.offset()) {
                 iterator.remove();
                 numCommittedOffsets++;
             } else {
@@ -147,7 +153,7 @@ public class OffsetManager {
         }
 
         for (Iterator<Long> iterator = emittedOffsets.iterator(); 
iterator.hasNext();) {
-            if (iterator.next() <= committedOffset.offset()) {
+            if (iterator.next() < committedOffset.offset()) {
                 iterator.remove();
             } else {
                 break;
@@ -156,8 +162,9 @@ public class OffsetManager {
 
         LOG.trace("{}", this);
         
-        LOG.debug("Committed [{}] offsets in the range [{}-{}] for 
topic-partition [{}].",
-                numCommittedOffsets, preCommitCommittedOffsets + 1, 
this.committedOffset, tp);
+        LOG.debug("Committed [{}] offsets in the range [{}-{}] for 
topic-partition [{}]."
+            + " Processing will resume at [{}] if the spout restarts.",
+                numCommittedOffsets, preCommitCommittedOffset, 
this.committedOffset - 1, tp, this.committedOffset);
         
         return numCommittedOffsets;
     }
@@ -177,9 +184,14 @@ public class OffsetManager {
     public boolean contains(KafkaSpoutMessageId msgId) {
         return ackedMsgs.contains(msgId);
     }
+    
+    //VisibleForTesting
+    boolean containsEmitted(long offset) {
+        return emittedOffsets.contains(offset);
+    }
 
     @Override
-    public String toString() {
+    public final String toString() {
         return "OffsetManager{"
             + "topic-partition=" + tp
             + ", fetchOffset=" + initialFetchOffset

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 8710f7b..3981a9a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -42,8 +42,6 @@ import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.MockitoAnnotations;
 
-import org.mockito.stubbing.OngoingStubbing;
-
 public class KafkaSpoutCommitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -127,10 +125,10 @@ public class KafkaSpoutCommitTest {
             inOrder.verify(consumerMock).commitSync(commitCapture.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
-            //verify that Offset 9 was last committed offset
+            //verify that Offset 10 was last committed offset, since this is 
the offset the spout should resume at
             Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
             assertTrue(commits.containsKey(partition));
-            assertEquals(9, commits.get(partition).offset());
+            assertEquals(10, commits.get(partition).offset());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 1c7f723..1033e83 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -19,11 +19,8 @@ import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfigu
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -34,13 +31,10 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -58,6 +52,12 @@ import org.mockito.Captor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
+import static org.mockito.Matchers.eq;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
 public class KafkaSpoutRebalanceTest {
 
     @Captor
@@ -234,6 +234,6 @@ public class KafkaSpoutRebalanceTest {
         //This partition was previously assigned, so the consumer position 
shouldn't change
         verify(consumerMock, never()).seek(eq(assignedPartition), anyLong());
         //This partition is new, and should start at the committed offset
-        verify(consumerMock).seek(newPartition, committedOffset + 1);
+        verify(consumerMock).seek(newPartition, committedOffset);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 831e383..dac4bff 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -18,7 +18,10 @@ package org.apache.storm.kafka.spout;
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -122,9 +125,9 @@ public class KafkaSpoutRetryLimitTest {
             
inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
-            //verify that Offset 3 was committed for the given TopicPartition
+            //verify that offset 4 was committed for the given TopicPartition, 
since processing should resume at 4.
             assertTrue(committedOffsets.getValue().containsKey(partition));
-            assertEquals(lastOffset, ((OffsetAndMetadata) 
(committedOffsets.getValue().get(partition))).offset());
+            assertEquals(lastOffset + 1, ((OffsetAndMetadata) 
(committedOffsets.getValue().get(partition))).offset());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 5141a8f..436d052 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -170,7 +170,7 @@ public class SingleTopicKafkaSpoutTest {
             Map<TopicPartition, OffsetAndMetadata> capturedCommit = 
commitCapture.getValue();
             TopicPartition expectedTp = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
             assertThat("Should have committed to the right topic", 
capturedCommit, Matchers.hasKey(expectedTp));
-            assertThat("Should have committed all the acked messages", 
capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1));
+            assertThat("Should have committed all the acked messages", 
capturedCommit.get(expectedTp).offset(), is((long)messageCount));
 
             /* Verify that the following acked (now committed) tuples are not 
emitted again
              * Since the consumer position was somewhere in the middle of the 
acked tuples when the commit happened,

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
index e8896c9..abbacf9 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -21,6 +21,8 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import java.util.NoSuchElementException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
 import org.junit.Rule;
@@ -31,54 +33,141 @@ public class OffsetManagerTest {
 
     @Rule
     public ExpectedException expect = ExpectedException.none();
+    
+    private final long initialFetchOffset = 0;
+    private final TopicPartition testTp = new TopicPartition("testTopic", 0);
+    private final OffsetManager manager = new OffsetManager(testTp, 
initialFetchOffset);
 
     @Test
     public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() {
-        /*If topic compaction is enabled in Kafka, we sometimes need to commit 
past a gap of deleted offsets
+        /* If topic compaction is enabled in Kafka, we sometimes need to 
commit past a gap of deleted offsets
          * Since the Kafka consumer should return offsets in order, we can 
assume that if a message is acked
          * then any prior message will have been emitted at least once.
          * If we see an acked message and some of the offsets preceding it 
were not emitted, they must have been compacted away and should be skipped.
          */
-        
-        TopicPartition tp = new TopicPartition("test", 0);
-        OffsetManager manager = new OffsetManager(tp, 0);
-        
         manager.addToEmitMsgs(0);
         manager.addToEmitMsgs(1);
         manager.addToEmitMsgs(2);
         //3, 4 compacted away
-        manager.addToEmitMsgs(5);
-        manager.addToEmitMsgs(6);
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0));
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1));
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2));
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6));
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 6);
+        manager.addToAckMsgs(getMessageId(initialFetchOffset));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 1));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 2));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
         
-        assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset().offset(), is(2L));
+        assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset 
+ 3));
         
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
         
         assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted", 
-            manager.findNextCommitOffset().offset(), is(6L));
+            manager.findNextCommitOffset().offset(), is(initialFetchOffset + 
7));
     }
     
     @Test
     public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() {
-        
-        TopicPartition tp = new TopicPartition("test", 0);
-        OffsetManager manager = new OffsetManager(tp, 0);
-        
         //0-4 compacted away
-        manager.addToEmitMsgs(5);
-        manager.addToEmitMsgs(6);
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6));
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 6);
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
         
         assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset(), is(nullValue()));
         
-        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5));
+        manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
         
         assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted", 
-            manager.findNextCommitOffset().offset(), is(6L));
+            manager.findNextCommitOffset().offset(), is(initialFetchOffset + 
7));
+    }
+
+    @Test
+    public void testFindNextCommittedOffsetWithNoAcks() {
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        assertThat("There shouldn't be a next commit offset when nothing has 
been acked", nextCommitOffset, is(nullValue()));
+    }
+
+    @Test
+    public void testFindNextCommitOffsetWithOneAck() {
+        /*
+         * The KafkaConsumer commitSync API docs: "The committed offset should 
be the next message your application will consume, i.e.
+         * lastProcessedMessageOffset + 1. "
+         */
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        assertThat("The next commit offset should be one past the processed 
message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1));
+    }
+
+    @Test
+    public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() {
+        emitAndAckMessage(getMessageId(initialFetchOffset + 1));
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        assertThat("The next commit offset should be one past the processed 
message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2));
+    }
+
+    @Test
+    public void testFindNextCommitOffsetWithAckedOffsetGap() {
+        emitAndAckMessage(getMessageId(initialFetchOffset + 2));
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        assertThat("The next commit offset should cover the sequential acked 
offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
+    }
+
+    @Test
+    public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() {
+        /**
+         * If topic compaction is enabled in Kafka some offsets may be deleted.
+         * We distinguish this case from regular gaps in the acked offset 
sequence caused by out of order acking
+         * by checking that offsets in the gap have been emitted at some point 
previously. 
+         * If they haven't then they can't exist in Kafka, since the spout 
emits tuples in order.
+         */
+        emitAndAckMessage(getMessageId(initialFetchOffset + 2));
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        assertThat("The next commit offset should cover all the acked offsets, 
since the offset in the gap hasn't been emitted and doesn't exist",
+            nextCommitOffset.offset(), is(initialFetchOffset + 3));
+    }
+    
+    @Test
+    public void testFindNextCommitOffsetWithUnackedOffsetGap() {
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        assertThat("The next commit offset should cover the contiguously acked 
offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
+    }
+    
+    @Test
+    public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() {
+        OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10);
+        emitAndAckMessage(getMessageId(0));
+        OffsetAndMetadata nextCommitOffset = 
startAtHighOffsetManager.findNextCommitOffset();
+        assertThat("Acking an offset earlier than the committed offset should 
have no effect", nextCommitOffset, is(nullValue()));
+    }
+    
+    @Test
+    public void testCommit() {
+        emitAndAckMessage(getMessageId(initialFetchOffset));
+        emitAndAckMessage(getMessageId(initialFetchOffset + 1));
+        emitAndAckMessage(getMessageId(initialFetchOffset + 2));
+        
+        long committedMessages = manager.commit(new 
OffsetAndMetadata(initialFetchOffset + 2));
+        
+        assertThat("Should have committed all messages to the left of the 
earliest uncommitted offset", committedMessages, is(2L));
+        assertThat("The committed messages should not be in the acked list 
anymore", manager.contains(getMessageId(initialFetchOffset)), is(false));
+        assertThat("The committed messages should not be in the emitted list 
anymore", manager.containsEmitted(initialFetchOffset), is(false));
+        assertThat("The committed messages should not be in the acked list 
anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false));
+        assertThat("The committed messages should not be in the emitted list 
anymore", manager.containsEmitted(initialFetchOffset + 1), is(false));
+        assertThat("The uncommitted message should still be in the acked 
list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true));
+        assertThat("The uncommitted message should still be in the emitted 
list", manager.containsEmitted(initialFetchOffset + 2), is(true));
+    }
+
+    private KafkaSpoutMessageId getMessageId(long offset) {
+        return new KafkaSpoutMessageId(testTp, offset);
+    }
+    
+    private void emitAndAckMessage(KafkaSpoutMessageId msgId) {
+        manager.addToEmitMsgs(msgId.offset());
+        manager.addToAckMsgs(msgId);
     }
 
 }

Reply via email to