Repository: storm Updated Branches: refs/heads/1.x-branch e83878a7b -> 644f5cb98
STORM-2666: Fix storm-kafka-client spout sometimes emitting messages that were already committed. Expand tests, add some runtime validation, minor refactoring to increase code readability. Ensure OffsetManager commits as many offsets as possible when an offset void (deleted offsets) occurs, rather than just up to the gap. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9b95f3da Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9b95f3da Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9b95f3da Branch: refs/heads/1.x-branch Commit: 9b95f3da608760b334bb8a9a272e64e0fb9fb3ac Parents: cb2d7e8 Author: Stig Rohde Døssing <s...@apache.org> Authored: Sat Aug 12 16:56:45 2017 +0200 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Wed Oct 4 20:36:40 2017 +0200 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 58 ++-- .../KafkaSpoutRetryExponentialBackoff.java | 37 +-- .../kafka/spout/KafkaSpoutRetryService.java | 9 +- .../kafka/spout/internal/OffsetManager.java | 8 +- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 23 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 56 +++- .../KafkaSpoutRetryExponentialBackoffTest.java | 287 +++++++++++++++++++ .../kafka/spout/SingleTopicKafkaSpoutTest.java | 62 +++- .../kafka/spout/internal/OffsetManagerTest.java | 84 ++++++ 9 files changed, 555 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index fbd869c..4f9dacb 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; - +import org.apache.commons.lang.Validate; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -147,10 +147,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // =========== Consumer Rebalance Listener - On the same thread as the caller =========== private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + private Collection<TopicPartition> previousAssignment = new HashSet<>(); + @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + previousAssignment = partitions; if (isAtLeastOnce() && initialized) { initialized = false; commitOffsetsForAckedTuples(); @@ -175,20 +178,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { * Emitted messages for partitions that are no longer assigned to this spout can't * be acked and should not be retried, hence remove them from emitted collection. */ - Set<TopicPartition> partitionsSet = new HashSet<>(partitions); - Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator(); - while (msgIdIterator.hasNext()) { - KafkaSpoutMessageId msgId = msgIdIterator.next(); - if (!partitionsSet.contains(msgId.getTopicPartition())) { - msgIdIterator.remove(); - } - } + emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition())); } - for (TopicPartition tp : partitions) { + Set<TopicPartition> newPartitions = new HashSet<>(partitions); + newPartitions.removeAll(previousAssignment); + for (TopicPartition tp : newPartitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); - // Add offset managers for the new partitions. // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) { offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); @@ -202,18 +199,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { - long fetchOffset; if (committedOffset != null) { // offset was committed for this TopicPartition if (firstPollOffsetStrategy.equals(EARLIEST)) { kafkaConsumer.seekToBeginning(Collections.singleton(tp)); - fetchOffset = kafkaConsumer.position(tp); } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); - fetchOffset = kafkaConsumer.position(tp); } else { // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. - fetchOffset = committedOffset.offset() + 1; - kafkaConsumer.seek(tp, fetchOffset); + kafkaConsumer.seek(tp, committedOffset.offset() + 1); } } else { // no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { @@ -221,9 +214,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } - fetchOffset = kafkaConsumer.position(tp); } - return fetchOffset; + return kafkaConsumer.position(tp); } } @@ -231,7 +223,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void nextTuple() { try { - if (initialized) { + if (initialized) { if (commit()) { commitOffsetsForAckedTuples(); } @@ -339,12 +331,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout { */ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - final KafkaSpoutMessageId msgId = retryService.getMessageId(record); + final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset()); if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { + Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), + "The spout is about to emit a message that has already been committed." + + " This should never occur, and indicates a bug in the spout"); final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId); @@ -408,6 +403,23 @@ public class KafkaSpout<K, V> extends BaseRichSpout { for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) { //Update the OffsetManager for each committed partition, and update numUncommittedOffsets final TopicPartition tp = tpOffset.getKey(); + long position = kafkaConsumer.position(tp); + long committedOffset = tpOffset.getValue().offset(); + if (position < committedOffset) { + /* + * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, + * lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked. + * The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple. + * Skip the consumer forward to the committed offset drop the current waiting to emit list, + * since it'll likely contain committed offsets. + */ + LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", + position, committedOffset); + kafkaConsumer.seek(tp, committedOffset); + waitingToEmit = null; + } + + final OffsetManager offsetManager = offsetManagers.get(tp); long numCommittedOffsets = offsetManager.commit(tpOffset.getValue()); numUncommittedOffsets -= numCommittedOffsets; @@ -437,6 +449,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); } } else { + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." + + " This should never occur barring errors in the RetryService implementation or the spout code."); offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } @@ -456,6 +470,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." + + " This should never occur barring errors in the RetryService implementation or the spout code."); msgId.incrementNumFails(); if (!retryService.schedule(msgId)) { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index 6b53779..81ef986 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.commons.lang.Validate; import org.apache.storm.utils.Time; /** @@ -49,6 +49,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService private final TimeInterval maxDelay; private final int maxRetries; + //This class assumes that there is at most one retry schedule per message id in this set at a time. private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups @@ -168,7 +169,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService this.delayPeriod = delayPeriod; this.maxRetries = maxRetries; this.maxDelay = maxDelay; - LOG.debug("Instantiated {}", this); + LOG.debug("Instantiated {}", this.toStringImpl()); } @Override @@ -196,13 +197,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService @Override public boolean isReady(KafkaSpoutMessageId msgId) { boolean retry = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { if (retrySchedule.msgId.equals(msgId)) { retry = true; LOG.debug("Found entry to retry {}", retrySchedule); + break; //Stop searching if the message is known to be ready for retry } } else { LOG.debug("Entry to retry not found {}", retrySchedule); @@ -221,14 +223,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService @Override public boolean remove(KafkaSpoutMessageId msgId) { boolean removed = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { + toRetryMsgs.remove(msgId); for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { final RetrySchedule retrySchedule = iterator.next(); if (retrySchedule.msgId().equals(msgId)) { iterator.remove(); - toRetryMsgs.remove(msgId); removed = true; - break; + break; //There is at most one schedule per message id } } } @@ -261,15 +263,8 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); return false; } else { - if (toRetryMsgs.contains(msgId)) { - for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { - final RetrySchedule retrySchedule = iterator.next(); - if (retrySchedule.msgId().equals(msgId)) { - iterator.remove(); - toRetryMsgs.remove(msgId); - } - } - } + //Remove existing schedule for the message id + remove(msgId); final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); retrySchedules.add(retrySchedule); toRetryMsgs.add(msgId); @@ -294,9 +289,9 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService } @Override - public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) { - KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); - if (toRetryMsgs.contains(msgId)) { + public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) { + KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset); + if (isScheduled(msgId)) { for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { if (originalMsgId.equals(msgId)) { return originalMsgId; @@ -308,6 +303,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE private long nextTime(KafkaSpoutMessageId msgId) { + Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once"); final long currentTimeNanos = Time.nanoTime(); final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... ? currentTimeNanos + initialDelay.lengthNanos @@ -317,6 +313,11 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService @Override public String toString() { + return toStringImpl(); + } + + private String toStringImpl() { + //This is here to avoid an overridable call in the constructor return "KafkaSpoutRetryExponentialBackoff{" + "delay=" + initialDelay + ", ratio=" + delayPeriod + http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java index 5147752..b70acd7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition; import java.io.Serializable; import java.util.Collection; import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; /** * Represents the logic that manages the retrial of failed tuples. @@ -84,9 +84,10 @@ public interface KafkaSpoutRetryService extends Serializable { int readyMessageCount(); /** - * Gets the {@link KafkaSpoutMessageId} for the given record. - * @param record The record to fetch the id for + * Gets the {@link KafkaSpoutMessageId} for the record on the given topic partition and offset. + * @param topicPartition The topic partition of the record + * @param offset The offset of the record * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry. */ - KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record); + KafkaSpoutMessageId getMessageId(TopicPartition topicPartition, long offset); } http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 5139072..b6d36d8 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -92,7 +92,7 @@ public class OffsetManager { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; @@ -103,9 +103,9 @@ public class OffsetManager { } } } else { - //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); + throw new IllegalStateException("The offset [" + currOffset + "] is below the current committed " + + "offset [" + committedOffset + "] for [" + tp + "]." + + " This should not be possible, and likely indicates a bug in the spout's acking or emit logic."); } } http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index afc9b82..c9c684f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -42,6 +42,8 @@ import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import org.mockito.stubbing.OngoingStubbing; + public class KafkaSpoutCommitTest { private final long offsetCommitPeriodMs = 2_000; @@ -94,7 +96,7 @@ public class KafkaSpoutCommitTest { spout.ack(messageId); } - // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4 + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); Map<TopicPartition, List<ConsumerRecord<String, String>>> emptyConsumerRecords = Collections.emptyMap(); when(consumerMock.poll(anyLong())) @@ -105,25 +107,8 @@ public class KafkaSpoutCommitTest { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 4 was last committed offset - //the offset void should be bridged in the next commit - Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); - assertTrue(commits.containsKey(partition)); - assertEquals(4, commits.get(partition).offset()); - - //Trigger second kafka consumer commit - reset(consumerMock); - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<String, String>(emptyConsumerRecords)); - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - spout.nextTuple(); - - inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 9 was last committed offset - commits = commitCapture.getValue(); + Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); assertTrue(commits.containsKey(partition)); assertEquals(9, commits.get(partition).offset()); } http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index ab57052..b16ba5d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -55,6 +54,11 @@ import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.eq; + +import java.util.HashSet; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; public class KafkaSpoutRebalanceTest { @@ -181,7 +185,7 @@ public class KafkaSpoutRebalanceTest { when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class))) .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0)) .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); - + //Emit a message on each partition and revoke the first partition List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); @@ -197,4 +201,52 @@ public class KafkaSpoutRebalanceTest { verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); verify(retryServiceMock).schedule(emittedMessageIds.get(1)); } + + @Test + public void testReassignPartitionSeeksForOnlyNewPartitions() { + /* + * When partitions are reassigned, the spout should seek with the first poll offset strategy for new partitions. + * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those. + */ + + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(), rebalanceListenerCapture.capture(), any()); + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) + .build(), consumerFactory); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition assignedPartition = new TopicPartition(topic, 1); + TopicPartition newPartition = new TopicPartition(topic, 2); + + //Setup spout with mock consumer so we can get at the rebalance listener + spout.open(conf, contextMock, collectorMock); + spout.activate(); + + //Assign partitions to the spout + ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); + Set<TopicPartition> assignedPartitions = new HashSet<>(); + assignedPartitions.add(assignedPartition); + consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + reset(consumerMock); + + //Set up committed so it looks like some messages have been committed on each partition + long committedOffset = 500; + when(consumerMock.committed(assignedPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + when(consumerMock.committed(newPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + + //Now rebalance and add a new partition + consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); + Set<TopicPartition> newAssignedPartitions = new HashSet<>(); + newAssignedPartitions.add(assignedPartition); + newAssignedPartitions.add(newPartition); + consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions); + + //This partition was previously assigned, so the consumer position shouldn't change + verify(consumerMock, never()).seek(eq(assignedPartition), anyLong()); + //This partition is new, and should start at the committed offset + verify(consumerMock).seek(newPartition, committedOffset + 1); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java new file mode 100644 index 0000000..c543f8b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java @@ -0,0 +1,287 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; + +public class KafkaSpoutRetryExponentialBackoffTest { + + private final TopicPartition testTopic = new TopicPartition("topic", 0); + private final TopicPartition testTopic2 = new TopicPartition("other-topic", 0); + + private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), 1, TimeInterval.seconds(0)); + } + + private KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), TimeInterval.seconds(0), 1, TimeInterval.seconds(1)); + } + + @Test + public void testCanScheduleRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must schedule the message for retry", scheduled, is(true)); + KafkaSpoutMessageId retrievedMessageId = retryService.getMessageId(testTopic, offset); + assertThat("The service should return the original message id when asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.isReady(msgId), is(true)); + assertThat(retryService.readyMessageCount(), is(1)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + } + + @Test + public void testCanRescheduleRetry() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must be able to reschedule an already scheduled id", scheduled, is(true)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry yet since it was rescheduled", retryService.isReady(msgId), is(false)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + Time.advanceTime(500); + assertThat("The message should be ready for retry once the full delay has passed", retryService.isReady(msgId), is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + assertThat(retryService.readyMessageCount(), is(1)); + } + } + + @Test + public void testCannotContainMultipleSchedulesForId() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + retryService.remove(msgId); + assertThat("The message should no longer be scheduled", retryService.isScheduled(msgId), is(false)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry because it isn't scheduled", retryService.isReady(msgId), is(false)); + } + } + + @Test + public void testCanRemoveRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + boolean removed = retryService.remove(msgId); + + assertThat(removed, is(true)); + assertThat(retryService.isScheduled(msgId), is(false)); + assertThat(retryService.isReady(msgId), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + } + + @Test + public void testCanHandleMultipleTopics() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are messages from multiple partitions scheduled + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdTp1 = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgIdTp2 = retryService.getMessageId(testTopic2, offset); + msgIdTp1.incrementNumFails(); + msgIdTp2.incrementNumFails(); + + boolean scheduledOne = retryService.schedule(msgIdTp1); + Time.advanceTime(500); + boolean scheduledTwo = retryService.schedule(msgIdTp2); + + //The retry schedules for two messages should be unrelated + assertThat(scheduledOne, is(true)); + assertThat(retryService.isScheduled(msgIdTp1), is(true)); + assertThat(scheduledTwo, is(true)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp1), is(true)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, offset))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp2), is(true)); + Map<TopicPartition, Long> earliestOffsets = new HashMap<>(); + earliestOffsets.put(testTopic, offset); + earliestOffsets.put(testTopic2, offset); + assertThat(retryService.earliestRetriableOffsets(), is(earliestOffsets)); + + //The service must be able to remove retry schedules for unnecessary partitions + retryService.retainAll(Collections.singleton(testTopic2)); + assertThat(retryService.isScheduled(msgIdTp1), is(false)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic2, offset))); + } + } + + @Test + public void testCanHandleMultipleMessagesOnPartition() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are multiple messages scheduled on a partition + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdEarliest = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgIdLatest = retryService.getMessageId(testTopic, offset + 1); + msgIdEarliest.incrementNumFails(); + msgIdLatest.incrementNumFails(); + + retryService.schedule(msgIdEarliest); + Time.advanceTime(500); + retryService.schedule(msgIdLatest); + + assertThat(retryService.isScheduled(msgIdEarliest), is(true)); + assertThat(retryService.isScheduled(msgIdLatest), is(true)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + retryService.remove(msgIdEarliest); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdLatest.offset()))); + } + } + + @Test + public void testMaxRetries() { + try (SimulatedTime time = new SimulatedTime()) { + int maxRetries = 3; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + for (int i = 0; i < maxRetries; i++) { + msgId.incrementNumFails(); + } + + //Should be allowed to retry 3 times, in addition to original try + boolean scheduled = retryService.schedule(msgId); + + assertThat(scheduled, is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + + retryService.remove(msgId); + msgId.incrementNumFails(); + boolean rescheduled = retryService.schedule(msgId); + + assertThat("The message should not be allowed to retry once the limit is reached", rescheduled, is(false)); + assertThat(retryService.isScheduled(msgId), is(false)); + } + } + + @Test + public void testMaxDelay() { + try (SimulatedTime time = new SimulatedTime()) { + int maxDelaySecs = 2; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + + Time.advanceTimeSecs(maxDelaySecs); + assertThat("The message should be ready for retry after the max delay", retryService.isReady(msgId), is(true)); + } + } + + private void validateBackoff(int expectedBackoffSeconds, KafkaSpoutMessageId msgId, KafkaSpoutRetryExponentialBackoff retryService) { + Time.advanceTimeSecs(expectedBackoffSeconds - 1); + assertThat("The message should not be ready for retry until the backoff has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat(retryService.isReady(msgId), is(true)); + } + + @Test + public void testExponentialBackoff() { + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(4), Integer.MAX_VALUE, TimeInterval.seconds(Integer.MAX_VALUE)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + msgId.incrementNumFails(); //First failure is the initial delay, so not interesting + + //Expecting 4*2^(failCount-1) + List<Integer> expectedBackoffsSecs = Arrays.asList(new Integer[]{8, 16, 32}); + + for (Integer expectedBackoffSecs : expectedBackoffsSecs) { + retryService.schedule(msgId); + + Time.advanceTimeSecs(expectedBackoffSecs - 1); + assertThat("The message should not be ready for retry until backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat("The message should be ready for retry once backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(true)); + + msgId.incrementNumFails(); + retryService.remove(msgId); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index cbbb391..8d7da7f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -59,6 +59,11 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.hamcrest.Matchers; public class SingleTopicKafkaSpoutTest { @@ -76,6 +81,7 @@ public class SingleTopicKafkaSpoutTest { private KafkaConsumer<String, String> consumerSpy; private KafkaConsumerFactory<String, String> consumerFactory; private KafkaSpout<String, String> spout; + private final int maxPollRecords = 10; @Before public void setUp() { @@ -84,6 +90,7 @@ public class SingleTopicKafkaSpoutTest { .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .build(); this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig)); this.consumerFactory = new KafkaConsumerFactory<String, String>() { @@ -115,6 +122,59 @@ public class SingleTopicKafkaSpoutTest { } @Test + public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception { + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = maxPollRecords * 2; + prepareSpout(messageCount); + + //Emit all messages and fail the first one while acking the rest + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIdCaptor.capture()); + List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues(); + for (int i = 1; i < messageIds.size(); i++) { + spout.ack(messageIds.get(i)); + } + KafkaSpoutMessageId failedTuple = messageIds.get(0); + spout.fail(failedTuple); + + //Advance the time and replay the failed tuple. + reset(collector); + spout.nextTuple(); + ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector).emit(anyObject(), anyObject(), failedIdReplayCaptor.capture()); + + assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); + + /* Ack the tuple, and commit. + * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. + */ + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); + spout.ack(failedIdReplayCaptor.getValue()); + spout.nextTuple(); + verify(consumerSpy).commitSync(commitCapture.capture()); + + Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue(); + TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1)); + + /* Verify that the following acked (now committed) tuples are not emitted again + * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, + * this verifies that the spout keeps the consumer position ahead of the committed offset when committing + */ + reset(collector); + //Just do a few polls to check that nothing more is emitted + for(int i = 0; i < 3; i++) { + spout.nextTuple(); + } + verify(collector, never()).emit(anyString(), anyList(), anyObject()); + } + } + + @Test public void shouldContinueWithSlowDoubleAcks() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 20; @@ -271,7 +331,7 @@ public class SingleTopicKafkaSpoutTest { verifyAllMessagesCommitted(messageCount); } } - + @Test public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { //The spout must reemit retriable tuples, even if they fail out of order. http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java new file mode 100644 index 0000000..e8896c9 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.util.NoSuchElementException; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class OffsetManagerTest { + + @Rule + public ExpectedException expect = ExpectedException.none(); + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() { + /*If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets + * Since the Kafka consumer should return offsets in order, we can assume that if a message is acked + * then any prior message will have been emitted at least once. + * If we see an acked message and some of the offsets preceding it were not emitted, they must have been compacted away and should be skipped. + */ + + TopicPartition tp = new TopicPartition("test", 0); + OffsetManager manager = new OffsetManager(tp, 0); + + manager.addToEmitMsgs(0); + manager.addToEmitMsgs(1); + manager.addToEmitMsgs(2); + //3, 4 compacted away + manager.addToEmitMsgs(5); + manager.addToEmitMsgs(6); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(2L)); + + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset().offset(), is(6L)); + } + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() { + + TopicPartition tp = new TopicPartition("test", 0); + OffsetManager manager = new OffsetManager(tp, 0); + + //0-4 compacted away + manager.addToEmitMsgs(5); + manager.addToEmitMsgs(6); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue())); + + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset().offset(), is(6L)); + } + +}