STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/148ee660 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/148ee660 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/148ee660 Branch: refs/heads/1.x-branch Commit: 148ee6609557d7436ce826da29ff3ec301458586 Parents: 4fc47b2 Author: Stig Rohde Døssing <stigdoess...@gmail.com> Authored: Sat Jul 1 17:29:56 2017 +0200 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Tue Oct 17 07:20:39 2017 +0200 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 6 +- .../kafka/spout/internal/OffsetManager.java | 86 +++++++----- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 6 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 4 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 8 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- .../kafka/spout/internal/OffsetManagerTest.java | 135 +++++++++++++++---- 7 files changed, 174 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 3582bdb..68bce11 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -202,7 +202,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } /** - * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was committed for this TopicPartition @@ -211,8 +211,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else { - // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. - kafkaConsumer.seek(tp, committedOffset.offset() + 1); + // By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. + kafkaConsumer.seek(tp, committedOffset.offset()); } } else { // no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 1c474e3..7dfe7f6 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -37,7 +37,7 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; - // Last offset committed to Kafka. Initially it is set to fetchOffset - 1 + // Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. private long committedOffset; // Emitted Offsets List private final NavigableSet<Long> emittedOffsets = new TreeSet<>(); @@ -47,8 +47,8 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; - this.committedOffset = initialFetchOffset - 1; - LOG.debug("Instantiated {}", this); + this.committedOffset = initialFetchOffset; + LOG.debug("Instantiated {}", this.toString()); } public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N) @@ -60,9 +60,11 @@ public class OffsetManager { } /** - * An offset is only committed when all records with lower offset have been + * An offset can only be committed when all emitted records with lower offset have been * acked. This guarantees that all offsets smaller than the committedOffset - * have been delivered. + * have been delivered, or that those offsets no longer exist in Kafka. + * <p/> + * The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API. * * @return the next OffsetAndMetadata to commit, or null if no offset is * ready to commit. @@ -71,74 +73,78 @@ public class OffsetManager { boolean found = false; long currOffset; long nextCommitOffset = committedOffset; - long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == lastOffMessageOffset + 1) { // found the next offset to commit + if (currOffset == nextCommitOffset) { // found the next offset to commit found = true; nextCommitMsg = currAckedMsg; - lastOffMessageOffset = currOffset; - nextCommitOffset = lastOffMessageOffset + 1; - } else if (currOffset > lastOffMessageOffset + 1) { - if (emittedOffsets.contains(lastOffMessageOffset + 1)) { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); + nextCommitOffset = currOffset + 1; + } else if (currOffset > nextCommitOffset) { + if (emittedOffsets.contains(nextCommitOffset)) { + LOG.debug("topic-partition [{}] has non-sequential offset [{}]." + + " It will be processed in a subsequent batch.", tp, currOffset); break; } else { /* - This case will arise in case of non contiguous offset being processed. - So, if the topic doesn't contain offset = committedOffset + 1 (possible + This case will arise in case of non-sequential offset being processed. + So, if the topic doesn't contain offset = nextCommitOffset (possible if the topic is compacted or deleted), the consumer should jump to the next logical point in the topic. Next logical offset should be the - first element after committedOffset in the ascending ordered emitted set. + first element after nextCommitOffset in the ascending ordered emitted set. */ - LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); + LOG.debug("Processed non-sequential offset." + + " The earliest uncommitted offset is no longer part of the topic." + + " Missing offset: [{}], Processed: [{}]", nextCommitOffset, currOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { - found = true; + LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", + currOffset, nextCommitOffset); nextCommitMsg = currAckedMsg; - lastOffMessageOffset = currOffset; - nextCommitOffset = lastOffMessageOffset + 1; + nextCommitOffset = currOffset + 1; } else { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); + LOG.debug("Topic-partition [{}] has non-sequential offset [{}]." + + " Next offset to commit should be [{}]", tp, currOffset, nextCommitOffset); break; } } } else { - throw new IllegalStateException("The offset [" + currOffset + "] is below the current committed " - + "offset [" + committedOffset + "] for [" + tp + "]." + throw new IllegalStateException("The offset [" + currOffset + "] is below the current nextCommitOffset " + + "[" + nextCommitOffset + "] for [" + tp + "]." + " This should not be possible, and likely indicates a bug in the spout's acking or emit logic."); } } OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); - LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", - tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, + nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed." + + " Processing will resume at offset [{}] if the spout restarts", + tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, nextCommitOffsetAndMetadata.offset()); } else { - LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); + LOG.debug("Topic-partition [{}] has no offsets ready to be committed", tp); } LOG.trace("{}", this); return nextCommitOffsetAndMetadata; } /** - * Marks an offset has committed. This method has side effects - it sets the + * Marks an offset as committed. This method has side effects - it sets the * internal state in such a way that future calls to - * {@link #findNextCommitOffset()} will return offsets greater than the + * {@link #findNextCommitOffset()} will return offsets greater than or equal to the * offset specified, if any. * - * @param committedOffset offset to be marked as committed + * @param committedOffset The committed offset. All lower offsets are expected to have been committed. * @return Number of offsets committed in this commit */ public long commit(OffsetAndMetadata committedOffset) { - final long preCommitCommittedOffsets = this.committedOffset; - final long numCommittedOffsets = committedOffset.offset() - this.committedOffset - 1; + final long preCommitCommittedOffset = this.committedOffset; + long numCommittedOffsets = 0; this.committedOffset = committedOffset.offset(); for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) { - if (iterator.next().offset() <= committedOffset.offset()) { + if (iterator.next().offset() < committedOffset.offset()) { iterator.remove(); numCommittedOffsets++; } else { @@ -147,7 +153,7 @@ public class OffsetManager { } for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) { - if (iterator.next() <= committedOffset.offset()) { + if (iterator.next() < committedOffset.offset()) { iterator.remove(); } else { break; @@ -156,8 +162,9 @@ public class OffsetManager { LOG.trace("{}", this); - LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}].", - numCommittedOffsets, preCommitCommittedOffsets + 1, this.committedOffset, tp); + LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]." + + " Processing will resume at [{}] if the spout restarts.", + numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset); return numCommittedOffsets; } @@ -177,9 +184,14 @@ public class OffsetManager { public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } + + //VisibleForTesting + boolean containsEmitted(long offset) { + return emittedOffsets.contains(offset); + } @Override - public String toString() { + public final String toString() { return "OffsetManager{" + "topic-partition=" + tp + ", fetchOffset=" + initialFetchOffset http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index c9c684f..eb498d4 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -42,8 +42,6 @@ import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import org.mockito.stubbing.OngoingStubbing; - public class KafkaSpoutCommitTest { private final long offsetCommitPeriodMs = 2_000; @@ -107,10 +105,10 @@ public class KafkaSpoutCommitTest { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 9 was last committed offset + //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); assertTrue(commits.containsKey(partition)); - assertEquals(9, commits.get(partition).offset()); + assertEquals(10, commits.get(partition).offset()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 3e50506..010433a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -53,7 +53,7 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import java.util.HashSet; @@ -247,6 +247,6 @@ public class KafkaSpoutRebalanceTest { //This partition was previously assigned, so the consumer position shouldn't change verify(consumerMock, never()).seek(eq(assignedPartition), anyLong()); //This partition is new, and should start at the committed offset - verify(consumerMock).seek(newPartition, committedOffset + 1); + verify(consumerMock).seek(newPartition, committedOffset); } } http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 39fa42c..21997f8 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -17,7 +17,6 @@ package org.apache.storm.kafka.spout; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -45,6 +44,9 @@ import org.mockito.InOrder; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; public class KafkaSpoutRetryLimitTest { @@ -108,9 +110,9 @@ public class KafkaSpoutRetryLimitTest { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 3 was committed for the given TopicPartition + //verify that offset 4 was committed for the given TopicPartition, since processing should resume at 4. assertTrue(commitCapture.getValue().containsKey(partition)); - assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); + assertEquals(lastOffset + 1, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 59921eb..211ae97 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -159,7 +159,7 @@ public class SingleTopicKafkaSpoutTest { Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue(); TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); - assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount)); /* Verify that the following acked (now committed) tuples are not emitted again * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index e8896c9..abbacf9 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -21,6 +21,8 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import java.util.NoSuchElementException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId; import org.junit.Rule; @@ -31,54 +33,141 @@ public class OffsetManagerTest { @Rule public ExpectedException expect = ExpectedException.none(); + + private final long initialFetchOffset = 0; + private final TopicPartition testTp = new TopicPartition("testTopic", 0); + private final OffsetManager manager = new OffsetManager(testTp, initialFetchOffset); @Test public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() { - /*If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets + /* If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets * Since the Kafka consumer should return offsets in order, we can assume that if a message is acked * then any prior message will have been emitted at least once. * If we see an acked message and some of the offsets preceding it were not emitted, they must have been compacted away and should be skipped. */ - - TopicPartition tp = new TopicPartition("test", 0); - OffsetManager manager = new OffsetManager(tp, 0); - manager.addToEmitMsgs(0); manager.addToEmitMsgs(1); manager.addToEmitMsgs(2); //3, 4 compacted away - manager.addToEmitMsgs(5); - manager.addToEmitMsgs(6); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 6); + manager.addToAckMsgs(getMessageId(initialFetchOffset)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 1)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 2)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); - assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(2L)); + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset + 3)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", - manager.findNextCommitOffset().offset(), is(6L)); + manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); } @Test public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() { - - TopicPartition tp = new TopicPartition("test", 0); - OffsetManager manager = new OffsetManager(tp, 0); - //0-4 compacted away - manager.addToEmitMsgs(5); - manager.addToEmitMsgs(6); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 6); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue())); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", - manager.findNextCommitOffset().offset(), is(6L)); + manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); + } + + @Test + public void testFindNextCommittedOffsetWithNoAcks() { + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue())); + } + + @Test + public void testFindNextCommitOffsetWithOneAck() { + /* + * The KafkaConsumer commitSync API docs: "The committed offset should be the next message your application will consume, i.e. + * lastProcessedMessageOffset + 1. " + */ + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() { + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2)); + } + + @Test + public void testFindNextCommitOffsetWithAckedOffsetGap() { + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the sequential acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { + /** + * If topic compaction is enabled in Kafka some offsets may be deleted. + * We distinguish this case from regular gaps in the acked offset sequence caused by out of order acking + * by checking that offsets in the gap have been emitted at some point previously. + * If they haven't then they can't exist in Kafka, since the spout emits tuples in order. + */ + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", + nextCommitOffset.offset(), is(initialFetchOffset + 3)); + } + + @Test + public void testFindNextCommitOffsetWithUnackedOffsetGap() { + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() { + OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10); + emitAndAckMessage(getMessageId(0)); + OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(); + assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue())); + } + + @Test + public void testCommit() { + emitAndAckMessage(getMessageId(initialFetchOffset)); + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + + long committedMessages = manager.commit(new OffsetAndMetadata(initialFetchOffset + 2)); + + assertThat("Should have committed all messages to the left of the earliest uncommitted offset", committedMessages, is(2L)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset), is(false)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset + 1), is(false)); + assertThat("The uncommitted message should still be in the acked list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true)); + assertThat("The uncommitted message should still be in the emitted list", manager.containsEmitted(initialFetchOffset + 2), is(true)); + } + + private KafkaSpoutMessageId getMessageId(long offset) { + return new KafkaSpoutMessageId(testTp, offset); + } + + private void emitAndAckMessage(KafkaSpoutMessageId msgId) { + manager.addToEmitMsgs(msgId.offset()); + manager.addToAckMsgs(msgId); } }