Repository: storm
Updated Branches:
  refs/heads/1.x-branch e83878a7b -> 644f5cb98


STORM-2666: Fix storm-kafka-client spout sometimes emitting messages that were 
already committed. Expand tests, add some runtime validation, minor refactoring 
to increase code readability. Ensure OffsetManager commits as many offsets as 
possible when an offset void (deleted offsets) occurs, rather than just up to 
the gap.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9b95f3da
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9b95f3da
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9b95f3da

Branch: refs/heads/1.x-branch
Commit: 9b95f3da608760b334bb8a9a272e64e0fb9fb3ac
Parents: cb2d7e8
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Sat Aug 12 16:56:45 2017 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Wed Oct 4 20:36:40 2017 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  58 ++--
 .../KafkaSpoutRetryExponentialBackoff.java      |  37 +--
 .../kafka/spout/KafkaSpoutRetryService.java     |   9 +-
 .../kafka/spout/internal/OffsetManager.java     |   8 +-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |  23 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  56 +++-
 .../KafkaSpoutRetryExponentialBackoffTest.java  | 287 +++++++++++++++++++
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  62 +++-
 .../kafka/spout/internal/OffsetManagerTest.java |  84 ++++++
 9 files changed, 555 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index fbd869c..4f9dacb 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -147,10 +147,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
     private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
 
+        private Collection<TopicPartition> previousAssignment = new 
HashSet<>();
+        
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
-                kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
+            previousAssignment = partitions;
             if (isAtLeastOnce() && initialized) {
                 initialized = false;
                 commitOffsetsForAckedTuples();
@@ -175,20 +178,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                  * Emitted messages for partitions that are no longer assigned 
to this spout can't
                  * be acked and should not be retried, hence remove them from 
emitted collection.
                  */
-                Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
-                Iterator<KafkaSpoutMessageId> msgIdIterator = 
emitted.iterator();
-                while (msgIdIterator.hasNext()) {
-                    KafkaSpoutMessageId msgId = msgIdIterator.next();
-                    if (!partitionsSet.contains(msgId.getTopicPartition())) {
-                        msgIdIterator.remove();
-                    }
-                }
+                emitted.removeIf(msgId -> 
!partitions.contains(msgId.getTopicPartition()));
             }
 
-            for (TopicPartition tp : partitions) {
+            Set<TopicPartition> newPartitions = new HashSet<>(partitions);
+            newPartitions.removeAll(previousAssignment);
+            for (TopicPartition tp : newPartitions) {
                 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
                 final long fetchOffset = doSeek(tp, committedOffset);
-                // Add offset managers for the new partitions.
                 // If this partition was previously assigned to this spout, 
leave the acked offsets as they were to resume where it left off
                 if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) {
                     offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
@@ -202,18 +199,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
          * sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset
          */
         private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
-            long fetchOffset;
             if (committedOffset != null) {             // offset was committed 
for this TopicPartition
                 if (firstPollOffsetStrategy.equals(EARLIEST)) {
                     kafkaConsumer.seekToBeginning(Collections.singleton(tp));
-                    fetchOffset = kafkaConsumer.position(tp);
                 } else if (firstPollOffsetStrategy.equals(LATEST)) {
                     kafkaConsumer.seekToEnd(Collections.singleton(tp));
-                    fetchOffset = kafkaConsumer.position(tp);
                 } else {
                     // By default polling starts at the last committed offset. 
+1 to point fetch to the first uncommitted offset.
-                    fetchOffset = committedOffset.offset() + 1;
-                    kafkaConsumer.seek(tp, fetchOffset);
+                    kafkaConsumer.seek(tp, committedOffset.offset() + 1);
                 }
             } else {    // no commits have ever been done, so start at the 
beginning or end depending on the strategy
                 if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
@@ -221,9 +214,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 } else if (firstPollOffsetStrategy.equals(LATEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
                     kafkaConsumer.seekToEnd(Collections.singleton(tp));
                 }
-                fetchOffset = kafkaConsumer.position(tp);
             }
-            return fetchOffset;
+            return kafkaConsumer.position(tp);
         }
     }
 
@@ -231,7 +223,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void nextTuple() {
         try {
-            if (initialized) {
+            if (initialized) {             
                 if (commit()) {
                     commitOffsetsForAckedTuples();
                 }
@@ -339,12 +331,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
      */
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
-        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
+        final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, 
record.offset());
         if (offsetManagers.containsKey(tp) && 
offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. 
Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
         } else {
+            Validate.isTrue(kafkaConsumer.committed(tp) == null || 
kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
+                "The spout is about to emit a message that has already been 
committed."
+                + " This should never occur, and indicates a bug in the 
spout");
             final List<Object> tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
             if (isEmitTuple(tuple)) {
                 final boolean isScheduled = retryService.isScheduled(msgId);
@@ -408,6 +403,23 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : 
nextCommitOffsets.entrySet()) {
                 //Update the OffsetManager for each committed partition, and 
update numUncommittedOffsets
                 final TopicPartition tp = tpOffset.getKey();
+                long position = kafkaConsumer.position(tp);
+                long committedOffset = tpOffset.getValue().offset();
+                if (position < committedOffset) {
+                    /*
+                     * The position is behind the committed offset. This can 
happen in some cases, e.g. if a message failed,
+                     * lots of (more than max.poll.records) later messages 
were acked, and the failed message then gets acked. 
+                     * The consumer may only be part way through "catching up" 
to where it was when it went back to retry the failed tuple. 
+                     * Skip the consumer forward to the committed offset drop 
the current waiting to emit list,
+                     * since it'll likely contain committed offsets.
+                     */
+                    LOG.debug("Consumer fell behind committed offset. Catching 
up. Position was [{}], skipping to [{}]",
+                        position, committedOffset);
+                    kafkaConsumer.seek(tp, committedOffset);
+                    waitingToEmit = null;
+                }
+                
+                
                 final OffsetManager offsetManager = offsetManagers.get(tp);
                 long numCommittedOffsets = 
offsetManager.commit(tpOffset.getValue());
                 numUncommittedOffsets -= numCommittedOffsets;
@@ -437,6 +449,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 LOG.debug("Received direct ack for message [{}], associated 
with null tuple", msgId);
             }
         } else {
+            Validate.isTrue(!retryService.isScheduled(msgId), "The message id 
" + msgId + " is queued for retry while being acked."
+                + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
             offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
@@ -456,6 +470,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Received fail for tuple this spout is no longer 
tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
             return;
         }
+        Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + 
msgId + " is queued for retry while being failed."
+            + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
         msgId.incrementNumFails();
         if (!retryService.schedule(msgId)) {
             LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 6b53779..81ef986 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.commons.lang.Validate;
 import org.apache.storm.utils.Time;
 
 /**
@@ -49,6 +49,7 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
     private final TimeInterval maxDelay;
     private final int maxRetries;
 
+    //This class assumes that there is at most one retry schedule per message 
id in this set at a time.
     private final Set<RetrySchedule> retrySchedules = new 
TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
     private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      
// Convenience data structure to speedup lookups
 
@@ -168,7 +169,7 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
         this.delayPeriod = delayPeriod;
         this.maxRetries = maxRetries;
         this.maxDelay = maxDelay;
-        LOG.debug("Instantiated {}", this);
+        LOG.debug("Instantiated {}", this.toStringImpl());
     }
 
     @Override
@@ -196,13 +197,14 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
     @Override
     public boolean isReady(KafkaSpoutMessageId msgId) {
         boolean retry = false;
-        if (toRetryMsgs.contains(msgId)) {
+        if (isScheduled(msgId)) {
             final long currentTimeNanos = Time.nanoTime();
             for (RetrySchedule retrySchedule : retrySchedules) {
                 if (retrySchedule.retry(currentTimeNanos)) {
                     if (retrySchedule.msgId.equals(msgId)) {
                         retry = true;
                         LOG.debug("Found entry to retry {}", retrySchedule);
+                        break; //Stop searching if the message is known to be 
ready for retry
                     }
                 } else {
                     LOG.debug("Entry to retry not found {}", retrySchedule);
@@ -221,14 +223,14 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
     @Override
     public boolean remove(KafkaSpoutMessageId msgId) {
         boolean removed = false;
-        if (toRetryMsgs.contains(msgId)) {
+        if (isScheduled(msgId)) {
+            toRetryMsgs.remove(msgId);
             for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); 
iterator.hasNext(); ) {
                 final RetrySchedule retrySchedule = iterator.next();
                 if (retrySchedule.msgId().equals(msgId)) {
                     iterator.remove();
-                    toRetryMsgs.remove(msgId);
                     removed = true;
-                    break;
+                    break; //There is at most one schedule per message id
                 }
             }
         }
@@ -261,15 +263,8 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
             LOG.debug("Not scheduling [{}] because reached maximum number of 
retries [{}].", msgId, maxRetries);
             return false;
         } else {
-            if (toRetryMsgs.contains(msgId)) {
-                for (Iterator<RetrySchedule> iterator = 
retrySchedules.iterator(); iterator.hasNext(); ) {
-                    final RetrySchedule retrySchedule = iterator.next();
-                    if (retrySchedule.msgId().equals(msgId)) {
-                        iterator.remove();
-                        toRetryMsgs.remove(msgId);
-                    }
-                }
-            }
+            //Remove existing schedule for the message id
+            remove(msgId);
             final RetrySchedule retrySchedule = new RetrySchedule(msgId, 
nextTime(msgId));
             retrySchedules.add(retrySchedule);
             toRetryMsgs.add(msgId);
@@ -294,9 +289,9 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
     }
 
     @Override
-    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
-        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
-        if (toRetryMsgs.contains(msgId)) {
+    public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) {
+        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset);
+        if (isScheduled(msgId)) {
             for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
                 if (originalMsgId.equals(msgId)) {
                     return originalMsgId;
@@ -308,6 +303,7 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
 
     // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
     private long nextTime(KafkaSpoutMessageId msgId) {
+        Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message 
has failed at least once");
         final long currentTimeNanos = Time.nanoTime();
         final long nextTimeNanos = msgId.numFails() == 1                // 
numFails = 1, 2, 3, ...
                 ? currentTimeNanos + initialDelay.lengthNanos
@@ -317,6 +313,11 @@ public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService
 
     @Override
     public String toString() {
+        return toStringImpl();
+    }
+    
+    private String toStringImpl() {
+        //This is here to avoid an overridable call in the constructor
         return "KafkaSpoutRetryExponentialBackoff{" +
                 "delay=" + initialDelay +
                 ", ratio=" + delayPeriod +

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5147752..b70acd7 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * Represents the logic that manages the retrial of failed tuples.
@@ -84,9 +84,10 @@ public interface KafkaSpoutRetryService extends Serializable 
{
     int readyMessageCount();
 
     /**
-     * Gets the {@link KafkaSpoutMessageId} for the given record.
-     * @param record The record to fetch the id for
+     * Gets the {@link KafkaSpoutMessageId} for the record on the given topic 
partition and offset.
+     * @param topicPartition The topic partition of the record
+     * @param offset The offset of the record
      * @return The id the record was scheduled for retry with, or a new {@link 
KafkaSpoutMessageId} if the record was not scheduled for retry.
      */
-    KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
+    KafkaSpoutMessageId getMessageId(TopicPartition topicPartition, long 
offset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 5139072..b6d36d8 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -92,7 +92,7 @@ public class OffsetManager {
                         first element after committedOffset in the ascending 
ordered emitted set.
                      */
                     LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset);
+                    final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset + 1);
                     if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
                         found = true;
                         nextCommitMsg = currAckedMsg;
@@ -103,9 +103,9 @@ public class OffsetManager {
                     }
                 }
             } else {
-                //Received a redundant ack. Ignore and continue processing.
-                LOG.warn("topic-partition [{}] has unexpected offset [{}]. 
Current committed Offset [{}]",
-                    tp, currOffset, committedOffset);
+                throw new IllegalStateException("The offset [" + currOffset + 
"] is below the current committed "
+                    + "offset [" + committedOffset + "] for [" + tp + "]."
+                    + " This should not be possible, and likely indicates a 
bug in the spout's acking or emit logic.");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index afc9b82..c9c684f 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -42,6 +42,8 @@ import org.mockito.MockitoAnnotations;
 
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
 
+import org.mockito.stubbing.OngoingStubbing;
+
 public class KafkaSpoutCommitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -94,7 +96,7 @@ public class KafkaSpoutCommitTest {
                 spout.ack(messageId);
             }
 
-            // Advance time and then trigger first call to kafka consumer 
commit; the commit will progress till offset 4
+            // Advance time and then trigger first call to kafka consumer 
commit; the commit must progress to offset 9
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> 
emptyConsumerRecords = Collections.emptyMap();
             when(consumerMock.poll(anyLong()))
@@ -105,25 +107,8 @@ public class KafkaSpoutCommitTest {
             inOrder.verify(consumerMock).commitSync(commitCapture.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
-            //verify that Offset 4 was last committed offset
-            //the offset void should be bridged in the next commit
-            Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
-            assertTrue(commits.containsKey(partition));
-            assertEquals(4, commits.get(partition).offset());
-
-            //Trigger second kafka consumer commit
-            reset(consumerMock);
-            when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords<String, 
String>(emptyConsumerRecords));
-            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
-            spout.nextTuple();
-
-            inOrder = inOrder(consumerMock);
-            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
-            inOrder.verify(consumerMock).poll(anyLong());
-
             //verify that Offset 9 was last committed offset
-            commits = commitCapture.getValue();
+            Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
             assertTrue(commits.containsKey(partition));
             assertEquals(9, commits.get(partition).offset());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index ab57052..b16ba5d 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -55,6 +54,11 @@ import org.mockito.MockitoAnnotations;
 
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.eq;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 public class KafkaSpoutRebalanceTest {
 
@@ -181,7 +185,7 @@ public class KafkaSpoutRebalanceTest {
         when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class)))
             .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
             .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
-        
+
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(
             spout, partitionThatWillBeRevoked, assignedPartition, 
rebalanceListenerCapture);
@@ -197,4 +201,52 @@ public class KafkaSpoutRebalanceTest {
         verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
         verify(retryServiceMock).schedule(emittedMessageIds.get(1));
     }
+
+    @Test
+    public void testReassignPartitionSeeksForOnlyNewPartitions() {
+        /*
+         * When partitions are reassigned, the spout should seek with the 
first poll offset strategy for new partitions.
+         * Previously assigned partitions should be left alone, since the 
spout keeps the emitted and acked state for those.
+         */
+
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        Subscription subscriptionMock = mock(Subscription.class);
+        doNothing()
+            .when(subscriptionMock)
+            .subscribe(any(), rebalanceListenerCapture.capture(), any());
+        KafkaSpout<String, String> spout = new 
KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+            .build(), consumerFactory);
+        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+        TopicPartition assignedPartition = new TopicPartition(topic, 1);
+        TopicPartition newPartition = new TopicPartition(topic, 2);
+
+        //Setup spout with mock consumer so we can get at the rebalance 
listener   
+        spout.open(conf, contextMock, collectorMock);
+        spout.activate();
+
+        //Assign partitions to the spout
+        ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+        Set<TopicPartition> assignedPartitions = new HashSet<>();
+        assignedPartitions.add(assignedPartition);
+        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+        reset(consumerMock);
+        
+        //Set up committed so it looks like some messages have been committed 
on each partition
+        long committedOffset = 500;
+        when(consumerMock.committed(assignedPartition)).thenReturn(new 
OffsetAndMetadata(committedOffset));
+        when(consumerMock.committed(newPartition)).thenReturn(new 
OffsetAndMetadata(committedOffset));
+
+        //Now rebalance and add a new partition
+        consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+        Set<TopicPartition> newAssignedPartitions = new HashSet<>();
+        newAssignedPartitions.add(assignedPartition);
+        newAssignedPartitions.add(newPartition);
+        consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions);
+        
+        //This partition was previously assigned, so the consumer position 
shouldn't change
+        verify(consumerMock, never()).seek(eq(assignedPartition), anyLong());
+        //This partition is new, and should start at the committed offset
+        verify(consumerMock).seek(newPartition, committedOffset + 1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
new file mode 100644
index 0000000..c543f8b
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+
+public class KafkaSpoutRetryExponentialBackoffTest {
+    
+    private final TopicPartition testTopic = new TopicPartition("topic", 0);
+    private final TopicPartition testTopic2 = new 
TopicPartition("other-topic", 0);
+
+    private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() {
+        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.seconds(0), 1, TimeInterval.seconds(0));
+    }
+
+    private KafkaSpoutRetryExponentialBackoff 
createOneSecondWaitRetryService() {
+        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), 
TimeInterval.seconds(0), 1, TimeInterval.seconds(1));
+    }
+
+    @Test
+    public void testCanScheduleRetry() {
+        KafkaSpoutRetryExponentialBackoff retryService = 
createNoWaitRetryService();
+        long offset = 0;
+        KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+        msgId.incrementNumFails();
+
+        boolean scheduled = retryService.schedule(msgId);
+
+        assertThat("The service must schedule the message for retry", 
scheduled, is(true));
+        KafkaSpoutMessageId retrievedMessageId = 
retryService.getMessageId(testTopic, offset);
+        assertThat("The service should return the original message id when 
asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId));
+        assertThat(retryService.isScheduled(msgId), is(true));
+        assertThat(retryService.isReady(msgId), is(true));
+        assertThat(retryService.readyMessageCount(), is(1));
+        assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgId.offset())));
+    }
+
+    @Test
+    public void testCanRescheduleRetry() {
+        try (SimulatedTime time = new SimulatedTime()) {
+
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+            KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+            msgId.incrementNumFails();
+
+            retryService.schedule(msgId);
+            Time.advanceTime(500);
+            boolean scheduled = retryService.schedule(msgId);
+
+            assertThat("The service must be able to reschedule an already 
scheduled id", scheduled, is(true));
+            Time.advanceTime(500);
+            assertThat("The message should not be ready for retry yet since it 
was rescheduled", retryService.isReady(msgId), is(false));
+            assertThat(retryService.isScheduled(msgId), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.emptyMap()));
+            assertThat(retryService.readyMessageCount(), is(0));
+            Time.advanceTime(500);
+            assertThat("The message should be ready for retry once the full 
delay has passed", retryService.isReady(msgId), is(true));
+            assertThat(retryService.isScheduled(msgId), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgId.offset())));
+            assertThat(retryService.readyMessageCount(), is(1));
+        }
+    }
+    
+    @Test
+    public void testCannotContainMultipleSchedulesForId() {
+        try (SimulatedTime time = new SimulatedTime()) {
+
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+            KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+            msgId.incrementNumFails();
+
+            retryService.schedule(msgId);
+            Time.advanceTime(500);
+            boolean scheduled = retryService.schedule(msgId);
+            
+            retryService.remove(msgId);
+            assertThat("The message should no longer be scheduled", 
retryService.isScheduled(msgId), is(false));
+            Time.advanceTime(500);
+            assertThat("The message should not be ready for retry because it 
isn't scheduled", retryService.isReady(msgId), is(false));
+        }
+    }
+
+    @Test
+    public void testCanRemoveRetry() {
+        KafkaSpoutRetryExponentialBackoff retryService = 
createNoWaitRetryService();
+        long offset = 0;
+        KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+        msgId.incrementNumFails();
+
+        retryService.schedule(msgId);
+        boolean removed = retryService.remove(msgId);
+
+        assertThat(removed, is(true));
+        assertThat(retryService.isScheduled(msgId), is(false));
+        assertThat(retryService.isReady(msgId), is(false));
+        assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.emptyMap()));
+        assertThat(retryService.readyMessageCount(), is(0));
+    }
+
+    @Test
+    public void testCanHandleMultipleTopics() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            //Tests that isScheduled, isReady and earliestRetriableOffsets are 
mutually consistent when there are messages from multiple partitions scheduled
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+
+            KafkaSpoutMessageId msgIdTp1 = 
retryService.getMessageId(testTopic, offset);
+            KafkaSpoutMessageId msgIdTp2 = 
retryService.getMessageId(testTopic2, offset);
+            msgIdTp1.incrementNumFails();
+            msgIdTp2.incrementNumFails();
+
+            boolean scheduledOne = retryService.schedule(msgIdTp1);
+            Time.advanceTime(500);
+            boolean scheduledTwo = retryService.schedule(msgIdTp2);
+
+            //The retry schedules for two messages should be unrelated
+            assertThat(scheduledOne, is(true));
+            assertThat(retryService.isScheduled(msgIdTp1), is(true));
+            assertThat(scheduledTwo, is(true));
+            assertThat(retryService.isScheduled(msgIdTp2), is(true));
+            assertThat(retryService.isReady(msgIdTp1), is(false));
+            assertThat(retryService.isReady(msgIdTp2), is(false));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdTp1), is(true));
+            assertThat(retryService.isReady(msgIdTp2), is(false));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, offset)));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdTp2), is(true));
+            Map<TopicPartition, Long> earliestOffsets = new HashMap<>();
+            earliestOffsets.put(testTopic, offset);
+            earliestOffsets.put(testTopic2, offset);
+            assertThat(retryService.earliestRetriableOffsets(), 
is(earliestOffsets));
+
+            //The service must be able to remove retry schedules for 
unnecessary partitions
+            retryService.retainAll(Collections.singleton(testTopic2));
+            assertThat(retryService.isScheduled(msgIdTp1), is(false));
+            assertThat(retryService.isScheduled(msgIdTp2), is(true));
+            assertThat(retryService.isReady(msgIdTp1), is(false));
+            assertThat(retryService.isReady(msgIdTp2), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic2, offset)));
+        }
+    }
+
+    @Test
+    public void testCanHandleMultipleMessagesOnPartition() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            //Tests that isScheduled, isReady and earliestRetriableOffsets are 
mutually consistent when there are multiple messages scheduled on a partition
+            KafkaSpoutRetryExponentialBackoff retryService = 
createOneSecondWaitRetryService();
+            long offset = 0;
+
+            KafkaSpoutMessageId msgIdEarliest = 
retryService.getMessageId(testTopic, offset);
+            KafkaSpoutMessageId msgIdLatest = 
retryService.getMessageId(testTopic, offset + 1);
+            msgIdEarliest.incrementNumFails();
+            msgIdLatest.incrementNumFails();
+
+            retryService.schedule(msgIdEarliest);
+            Time.advanceTime(500);
+            retryService.schedule(msgIdLatest);
+
+            assertThat(retryService.isScheduled(msgIdEarliest), is(true));
+            assertThat(retryService.isScheduled(msgIdLatest), is(true));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdEarliest), is(true));
+            assertThat(retryService.isReady(msgIdLatest), is(false));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgIdEarliest.offset())));
+
+            Time.advanceTime(500);
+            assertThat(retryService.isReady(msgIdEarliest), is(true));
+            assertThat(retryService.isReady(msgIdLatest), is(true));
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgIdEarliest.offset())));
+
+            retryService.remove(msgIdEarliest);
+            assertThat(retryService.earliestRetriableOffsets(), 
is(Collections.singletonMap(testTopic, msgIdLatest.offset())));
+        }
+    }
+
+    @Test
+    public void testMaxRetries() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            int maxRetries = 3;
+            KafkaSpoutRetryExponentialBackoff retryService = new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0));
+            long offset = 0;
+
+            KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+            for (int i = 0; i < maxRetries; i++) {
+                msgId.incrementNumFails();
+            }
+
+            //Should be allowed to retry 3 times, in addition to original try
+            boolean scheduled = retryService.schedule(msgId);
+
+            assertThat(scheduled, is(true));
+            assertThat(retryService.isScheduled(msgId), is(true));
+
+            retryService.remove(msgId);
+            msgId.incrementNumFails();
+            boolean rescheduled = retryService.schedule(msgId);
+
+            assertThat("The message should not be allowed to retry once the 
limit is reached", rescheduled, is(false));
+            assertThat(retryService.isScheduled(msgId), is(false));
+        }
+    }
+
+    @Test
+    public void testMaxDelay() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            int maxDelaySecs = 2;
+            KafkaSpoutRetryExponentialBackoff retryService = new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), 
TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs));
+            long offset = 0;
+
+            KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+            msgId.incrementNumFails();
+
+            retryService.schedule(msgId);
+
+            Time.advanceTimeSecs(maxDelaySecs);
+            assertThat("The message should be ready for retry after the max 
delay", retryService.isReady(msgId), is(true));
+        }
+    }
+
+    private void validateBackoff(int expectedBackoffSeconds, 
KafkaSpoutMessageId msgId, KafkaSpoutRetryExponentialBackoff retryService) {
+        Time.advanceTimeSecs(expectedBackoffSeconds - 1);
+        assertThat("The message should not be ready for retry until the 
backoff has expired", retryService.isReady(msgId), is(false));
+        Time.advanceTimeSecs(1);
+        assertThat(retryService.isReady(msgId), is(true));
+    }
+
+    @Test
+    public void testExponentialBackoff() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpoutRetryExponentialBackoff retryService = new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.seconds(4), Integer.MAX_VALUE, 
TimeInterval.seconds(Integer.MAX_VALUE));
+            long offset = 0;
+
+            KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, 
offset);
+            msgId.incrementNumFails();
+            msgId.incrementNumFails(); //First failure is the initial delay, 
so not interesting
+
+            //Expecting 4*2^(failCount-1)
+            List<Integer> expectedBackoffsSecs = Arrays.asList(new 
Integer[]{8, 16, 32});
+            
+            for (Integer expectedBackoffSecs : expectedBackoffsSecs) {
+                retryService.schedule(msgId);
+
+                Time.advanceTimeSecs(expectedBackoffSecs - 1);
+                assertThat("The message should not be ready for retry until 
backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), 
is(false));
+                Time.advanceTimeSecs(1);
+                assertThat("The message should be ready for retry once backoff 
" + expectedBackoffSecs + " has expired", retryService.isReady(msgId), 
is(true));
+
+                msgId.incrementNumFails();
+                retryService.remove(msgId);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index cbbb391..8d7da7f 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -59,6 +59,11 @@ import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.hamcrest.Matchers;
 
 public class SingleTopicKafkaSpoutTest {
 
@@ -76,6 +81,7 @@ public class SingleTopicKafkaSpoutTest {
     private KafkaConsumer<String, String> consumerSpy;
     private KafkaConsumerFactory<String, String> consumerFactory;
     private KafkaSpout<String, String> spout;
+    private final int maxPollRecords = 10;
 
     @Before
     public void setUp() {
@@ -84,6 +90,7 @@ public class SingleTopicKafkaSpoutTest {
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
             .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
                 maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
             .build();
         this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, 
String>().createConsumer(spoutConfig));
         this.consumerFactory = new KafkaConsumerFactory<String, String>() {
@@ -115,6 +122,59 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
+    public void 
testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws 
Exception {
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = maxPollRecords * 2;
+            prepareSpout(messageCount);
+
+            //Emit all messages and fail the first one while acking the rest
+            for (int i = 0; i < messageCount; i++) {
+                spout.nextTuple();
+            }
+            ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(messageCount)).emit(anyObject(), 
anyObject(), messageIdCaptor.capture());
+            List<KafkaSpoutMessageId> messageIds = 
messageIdCaptor.getAllValues();
+            for (int i = 1; i < messageIds.size(); i++) {
+                spout.ack(messageIds.get(i));
+            }
+            KafkaSpoutMessageId failedTuple = messageIds.get(0);
+            spout.fail(failedTuple);
+
+            //Advance the time and replay the failed tuple. 
+            reset(collector);
+            spout.nextTuple();
+            ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector).emit(anyObject(), anyObject(), 
failedIdReplayCaptor.capture());
+
+            assertThat("Expected replay of failed tuple", 
failedIdReplayCaptor.getValue(), is(failedTuple));
+
+            /* Ack the tuple, and commit.
+             * Since the tuple is more than max poll records behind the most 
recent emitted tuple, the consumer won't catch up in this poll.
+             */
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
+            spout.ack(failedIdReplayCaptor.getValue());
+            spout.nextTuple();
+            verify(consumerSpy).commitSync(commitCapture.capture());
+            
+            Map<TopicPartition, OffsetAndMetadata> capturedCommit = 
commitCapture.getValue();
+            TopicPartition expectedTp = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+            assertThat("Should have committed to the right topic", 
capturedCommit, Matchers.hasKey(expectedTp));
+            assertThat("Should have committed all the acked messages", 
capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1));
+
+            /* Verify that the following acked (now committed) tuples are not 
emitted again
+             * Since the consumer position was somewhere in the middle of the 
acked tuples when the commit happened,
+             * this verifies that the spout keeps the consumer position ahead 
of the committed offset when committing
+             */
+            reset(collector);
+            //Just do a few polls to check that nothing more is emitted
+            for(int i = 0; i < 3; i++) {
+                spout.nextTuple();
+            }
+            verify(collector, never()).emit(anyString(), anyList(), 
anyObject());
+        }
+    }
+
+    @Test
     public void shouldContinueWithSlowDoubleAcks() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 20;
@@ -271,7 +331,7 @@ public class SingleTopicKafkaSpoutTest {
             verifyAllMessagesCommitted(messageCount);
         }
     }
-    
+
     @Test
     public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws 
Exception {
         //The spout must reemit retriable tuples, even if they fail out of 
order.

http://git-wip-us.apache.org/repos/asf/storm/blob/9b95f3da/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
new file mode 100644
index 0000000..e8896c9
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import java.util.NoSuchElementException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class OffsetManagerTest {
+
+    @Rule
+    public ExpectedException expect = ExpectedException.none();
+
+    @Test
+    public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() {
+        /*If topic compaction is enabled in Kafka, we sometimes need to commit 
past a gap of deleted offsets
+         * Since the Kafka consumer should return offsets in order, we can 
assume that if a message is acked
+         * then any prior message will have been emitted at least once.
+         * If we see an acked message and some of the offsets preceding it 
were not emitted, they must have been compacted away and should be skipped.
+         */
+        
+        TopicPartition tp = new TopicPartition("test", 0);
+        OffsetManager manager = new OffsetManager(tp, 0);
+        
+        manager.addToEmitMsgs(0);
+        manager.addToEmitMsgs(1);
+        manager.addToEmitMsgs(2);
+        //3, 4 compacted away
+        manager.addToEmitMsgs(5);
+        manager.addToEmitMsgs(6);
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0));
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1));
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2));
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6));
+        
+        assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset().offset(), is(2L));
+        
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5));
+        
+        assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted", 
+            manager.findNextCommitOffset().offset(), is(6L));
+    }
+    
+    @Test
+    public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() {
+        
+        TopicPartition tp = new TopicPartition("test", 0);
+        OffsetManager manager = new OffsetManager(tp, 0);
+        
+        //0-4 compacted away
+        manager.addToEmitMsgs(5);
+        manager.addToEmitMsgs(6);
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6));
+        
+        assertThat("The offset manager should not skip past offset 5 which is 
still pending", manager.findNextCommitOffset(), is(nullValue()));
+        
+        manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5));
+        
+        assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted", 
+            manager.findNextCommitOffset().offset(), is(6L));
+    }
+
+}

Reply via email to