Repository: kafka
Updated Branches:
  refs/heads/trunk 76c9a6dcb -> 6682abe4a


KAFKA-5797: Delay checking of partition existence in StoreChangelogReader

1. Remove timeout-based validatePartitionExists from StoreChangelogReader; 
instead only try to refresh metadata once after all tasks have been created and 
their topology initialized (hence all stores have been registered).
2. Add the logic to refresh partition metadata at the end of initialization if 
some restorers needing initialization cannot find their changelogs, hoping that 
in the next run loop these stores can find their changelogs.

As a result, after `initialize` is called we may not be able to start 
initializing all the `needsInitializing` ones.

As an optimization, we would not call `consumer#partitionsFor` any more, but 
only `consumer#listTopics` fetching all the topic metadata; so the only 
blocking calls left are `listTopics` and `endOffsets`, and we always capture 
timeout exceptions around these two calls, and delay to retry in the next run 
loop after refreshing the metadata. By doing this we can also reduce the number 
of request round trips between consumer and brokers.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax 
<matth...@confluent.io>

Closes #3748 from guozhangwang/K5797-handle-metadata-available


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6682abe4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6682abe4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6682abe4

Branch: refs/heads/trunk
Commit: 6682abe4ae68fdbf0eb362e45f43ea14e2aba847
Parents: 76c9a6d
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Wed Aug 30 08:43:22 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Aug 30 08:43:22 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      |   2 +-
 .../processor/internals/ChangelogReader.java    |  12 +-
 .../internals/ProcessorStateManager.java        |   1 -
 .../processor/internals/StateRestorer.java      |  10 +-
 .../internals/StoreChangelogReader.java         | 154 ++++++++++---------
 .../processor/internals/StreamThread.java       |   2 -
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../processor/internals/StandbyTaskTest.java    |   3 +-
 .../internals/StoreChangelogReaderTest.java     |  99 ++----------
 .../processor/internals/StreamTaskTest.java     |   3 +-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 .../apache/kafka/test/MockChangelogReader.java  |   9 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 -
 13 files changed, 112 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index c0c9ccc..f09c48e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -124,7 +124,7 @@ class AssignedTasks {
                 it.remove();
             } catch (final LockException e) {
                 // made this trace as it will spam the logs in the poll loop.
-                log.trace("{} Could not create {} {} due to {}; will retry", 
logPrefix, taskTypeName, entry.getKey(), e.getMessage());
+                log.trace("{} Could not create {} {} due to {}; will retry in 
the next run loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index d8ed35e..5ebc34c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -28,18 +28,10 @@ import java.util.Map;
  */
 public interface ChangelogReader {
     /**
-     * Validate that the partition exists on the cluster.
-     * @param topicPartition    partition to validate.
-     * @param storeName         name of the store the partition is for.
-     * @throws org.apache.kafka.streams.errors.StreamsException if partition 
doesn't exist
-     */
-    void validatePartitionExists(final TopicPartition topicPartition, final 
String storeName);
-
-    /**
      * Register a state store and it's partition for later restoration.
-     * @param restorationInfo
+     * @param restorer the state restorer to register
      */
-    void register(final StateRestorer restorationInfo);
+    void register(final StateRestorer restorer);
 
     /**
      * Restore all registered state stores by reading from their changelogs.

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 978e24b..acd7674 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -157,7 +157,6 @@ public class ProcessorStateManager implements StateManager {
         }
 
         final TopicPartition storePartition = new TopicPartition(topic, 
getPartition(topic));
-        changelogReader.validatePartitionExists(storePartition, store.name());
 
         if (isStandby) {
             if (store.persistent()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index ae68fd6..579561f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -29,11 +29,13 @@ public class StateRestorer {
     private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
-    private final TopicPartition partition;
     private final String storeName;
+    private final TopicPartition partition;
     private final CompositeRestoreListener compositeRestoreListener;
+
     private long restoredOffset;
     private long startingOffset;
+    private long endingOffset;
 
     StateRestorer(final TopicPartition partition,
                   final CompositeRestoreListener compositeRestoreListener,
@@ -57,7 +59,7 @@ public class StateRestorer {
         return checkpoint == null ? NO_CHECKPOINT : checkpoint;
     }
 
-    void restoreStarted(long startingOffset, long endingOffset) {
+    void restoreStarted() {
         compositeRestoreListener.onRestoreStart(partition, storeName, 
startingOffset, endingOffset);
     }
 
@@ -89,6 +91,10 @@ public class StateRestorer {
         this.startingOffset = Math.min(offsetLimit, startingOffset);
     }
 
+    void setEndingOffset(final long endingOffset) {
+        this.endingOffset = Math.min(offsetLimit, endingOffset);
+    }
+
     long startingOffset() {
         return startingOffset;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index e2cb3a2..57dff64 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -22,9 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,62 +40,27 @@ import java.util.Set;
 public class StoreChangelogReader implements ChangelogReader {
     private static final Logger log = 
LoggerFactory.getLogger(StoreChangelogReader.class);
 
-    private final Consumer<byte[], byte[]> consumer;
     private final String logPrefix;
-    private final Time time;
-    private final long partitionValidationTimeoutMs;
-    private final Map<String, List<PartitionInfo>> partitionInfo = new 
HashMap<>();
+    private final Consumer<byte[], byte[]> consumer;
     private final StateRestoreListener stateRestoreListener;
+    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+    private final Map<String, List<PartitionInfo>> partitionInfo = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new 
HashMap<>();
-    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
 
-    public StoreChangelogReader(final String threadId, final Consumer<byte[], 
byte[]> consumer, final Time time,
-                                final long partitionValidationTimeoutMs, final 
StateRestoreListener stateRestoreListener) {
-        this.time = time;
+    public StoreChangelogReader(final String threadId,
+                                final Consumer<byte[], byte[]> consumer,
+                                final StateRestoreListener 
stateRestoreListener) {
         this.consumer = consumer;
-        this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
 
         this.logPrefix = String.format("stream-thread [%s]", threadId);
         this.stateRestoreListener = stateRestoreListener;
     }
 
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final 
Time time,
-                                long partitionValidationTimeoutMs, final 
StateRestoreListener stateRestoreListener) {
-        this("", consumer, time, partitionValidationTimeoutMs, 
stateRestoreListener);
-    }
-
-    @Override
-    public void validatePartitionExists(final TopicPartition topicPartition, 
final String storeName) {
-        final long start = time.milliseconds();
-        // fetch all info on all topics to avoid multiple remote calls
-        if (partitionInfo.isEmpty()) {
-            try {
-                partitionInfo.putAll(consumer.listTopics());
-            } catch (final TimeoutException e) {
-                log.warn("{} Could not list topics so will fall back to 
partition by partition fetching", logPrefix);
-            }
-        }
-
-        final long endTime = time.milliseconds() + 
partitionValidationTimeoutMs;
-        while (!hasPartition(topicPartition) && time.milliseconds() < endTime) 
{
-            try {
-                final List<PartitionInfo> partitions = 
consumer.partitionsFor(topicPartition.topic());
-                if (partitions != null) {
-                    partitionInfo.put(topicPartition.topic(), partitions);
-                }
-            } catch (final TimeoutException e) {
-                throw new StreamsException(String.format("Could not fetch 
partition info for topic: %s before expiration of the configured request 
timeout",
-                                                         
topicPartition.topic()));
-            }
-        }
-
-        if (!hasPartition(topicPartition)) {
-            throw new StreamsException(String.format("Store %s's change log 
(%s) does not contain partition %s",
-                                                     storeName, 
topicPartition.topic(), topicPartition.partition()));
-        }
-        log.debug("{} Took {}ms to validate that partition {} exists", 
logPrefix, time.milliseconds() - start, topicPartition);
+    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
+                                final StateRestoreListener 
stateRestoreListener) {
+        this("", consumer, stateRestoreListener);
     }
 
     @Override
@@ -106,7 +70,6 @@ public class StoreChangelogReader implements ChangelogReader 
{
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-
     public Collection<TopicPartition> restore() {
         if (!needsInitializing.isEmpty()) {
             initialize();
@@ -131,45 +94,79 @@ public class StoreChangelogReader implements 
ChangelogReader {
     }
 
     private void initialize() {
-        final Map<TopicPartition, StateRestorer> newTasksNeedingRestoration = 
new HashMap<>();
-
         if (!consumer.subscription().isEmpty()) {
-            throw new IllegalStateException(String.format("Restore consumer 
should have not subscribed to any partitions (%s) beforehand", 
consumer.subscription()));
+            throw new IllegalStateException("Restore consumer should not be 
subscribed to any topics (" + consumer.subscription() + ")");
         }
-        endOffsets.putAll(consumer.endOffsets(needsInitializing.keySet()));
-
-        // remove any partitions where we already have all of the data
-        for (final Map.Entry<TopicPartition, Long> entry : 
endOffsets.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
-            Long offset = entry.getValue();
-            final StateRestorer restorer = 
needsInitializing.get(topicPartition);
-            // might be null as has was initialized in a previous invocation.
-            if (restorer != null) {
-                if (restorer.checkpoint() >= offset) {
+
+        // first refresh the changelog partition information from brokers, 
since initialize is only called when
+        // the needsInitializing map is not empty, meaning we do not know the 
metadata for some of them yet
+        refreshChangelogInfo();
+
+        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
+        for (Map.Entry<TopicPartition, StateRestorer> entry : 
needsInitializing.entrySet()) {
+            final TopicPartition topicPartition = entry.getKey();
+            if (hasPartition(topicPartition)) {
+                initializable.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        // try to fetch end offsets for the initializable restorers and remove 
any partitions
+        // where we already have all of the data
+        try {
+            endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
+        } catch (final TimeoutException e) {
+            // if timeout exception gets thrown we just give up this time and 
retry in the next run loop
+            log.debug("{} Could not fetch end offset for {}; will fall back to 
partition by partition fetching", logPrefix, initializable);
+            return;
+        }
+
+        final Iterator<TopicPartition> iter = 
initializable.keySet().iterator();
+        while (iter.hasNext()) {
+            final TopicPartition topicPartition = iter.next();
+            final Long endOffset = endOffsets.get(topicPartition);
+
+            // offset should not be null; but since the consumer API does not 
guarantee it
+            // we add this check just in case
+            if (endOffset != null) {
+                final StateRestorer restorer = 
needsInitializing.get(topicPartition);
+                if (restorer.checkpoint() >= endOffset) {
                     restorer.setRestoredOffset(restorer.checkpoint());
-                } else if (restorer.offsetLimit() == 0 || 
endOffsets.get(topicPartition) == 0) {
+                    iter.remove();
+                } else if (restorer.offsetLimit() == 0 || endOffset == 0) {
                     restorer.setRestoredOffset(0);
+                    iter.remove();
                 } else {
-                    newTasksNeedingRestoration.put(topicPartition, restorer);
-                    final Long endOffset = endOffsets.get(topicPartition);
-                    restorer.restoreStarted(restorer.startingOffset(), 
endOffset);
+                    restorer.setEndingOffset(endOffset);
                 }
+                needsInitializing.remove(topicPartition);
+            } else {
+                log.info("{} End offset cannot be found form the returned 
metadata; removing this partition from the current run loop", logPrefix);
+                iter.remove();
             }
         }
 
-        log.debug("{} Starting restoring state stores from changelog topics 
{}", logPrefix, newTasksNeedingRestoration.keySet());
+        // set up restorer for those initializable
+        if (!initializable.isEmpty()) {
+            startRestoration(initializable);
+        }
+    }
+
+    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized) {
+        log.debug("{} Start restoring state stores from changelog topics {}", 
logPrefix, initialized.keySet());
 
         final Set<TopicPartition> assignment = new 
HashSet<>(consumer.assignment());
-        assignment.addAll(newTasksNeedingRestoration.keySet());
+        assignment.addAll(initialized.keySet());
         consumer.assign(assignment);
+
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
-        for (final StateRestorer restorer : 
newTasksNeedingRestoration.values()) {
+        for (final StateRestorer restorer : initialized.values()) {
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
                 consumer.seek(restorer.partition(), restorer.checkpoint());
                 logRestoreOffsets(restorer.partition(),
-                                  restorer.checkpoint(),
-                                  endOffsets.get(restorer.partition()));
+                        restorer.checkpoint(),
+                        endOffsets.get(restorer.partition()));
                 
restorer.setStartingOffset(consumer.position(restorer.partition()));
+                restorer.restoreStarted();
             } else {
                 
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                 needsPositionUpdate.add(restorer);
@@ -178,14 +175,14 @@ public class StoreChangelogReader implements 
ChangelogReader {
 
         for (final StateRestorer restorer : needsPositionUpdate) {
             final long position = consumer.position(restorer.partition());
-            restorer.setStartingOffset(position);
             logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
+                    position,
+                    endOffsets.get(restorer.partition()));
+            restorer.setStartingOffset(position);
+            restorer.restoreStarted();
         }
 
-        needsRestoring.putAll(newTasksNeedingRestoration);
-        needsInitializing.clear();
+        needsRestoring.putAll(initialized);
     }
 
     private void logRestoreOffsets(final TopicPartition partition, final long 
startingOffset, final Long endOffset) {
@@ -203,6 +200,14 @@ public class StoreChangelogReader implements 
ChangelogReader {
         return completed;
     }
 
+    private void refreshChangelogInfo() {
+        try {
+            partitionInfo.putAll(consumer.listTopics());
+        } catch (final TimeoutException e) {
+            log.debug("{} Could not fetch topic metadata within the timeout, 
will retry in the next run loop", logPrefix);
+        }
+    }
+
     @Override
     public Map<TopicPartition, Long> restoredOffsets() {
         final Map<TopicPartition, Long> restoredOffsets = new HashMap<>();
@@ -294,6 +299,5 @@ public class StoreChangelogReader implements 
ChangelogReader {
         }
 
         return false;
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a848172..d978f3d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -638,8 +638,6 @@ public class StreamThread extends Thread implements 
ThreadDataProvider {
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(consumerConfigs);
         final StoreChangelogReader changelogReader = new 
StoreChangelogReader(threadClientId,
                                                                               
restoreConsumer,
-                                                                              
time,
-                                                                              
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                                                                               
stateRestoreListener);
 
         Producer<byte[], byte[]> threadProducer = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index d6709b8..43fe24f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -122,7 +121,7 @@ public class AbstractTaskTest {
                                                       Collections.<String, 
String>emptyMap(),
                                                       
Collections.<StateStore>emptyList()),
                                 consumer,
-                                new StoreChangelogReader(consumer, 
Time.SYSTEM, 5000, new MockStateRestoreListener()),
+                                new StoreChangelogReader(consumer, new 
MockStateRestoreListener()),
                                 false,
                                 stateDirectory,
                                 config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f22e773..40a66da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -29,7 +29,6 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -133,7 +132,7 @@ public class StandbyTaskTest {
 
     private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new 
MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000, 
stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 8ac9a62..a480ec1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -22,11 +22,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -38,6 +35,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
@@ -53,94 +51,32 @@ public class StoreChangelogReaderTest {
     private final CompositeRestoreListener restoreListener = new 
CompositeRestoreListener(callback);
     private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, new MockTime(), 0, stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener);
     private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
-    private final PartitionInfo partitionInfo = new 
PartitionInfo(topicPartition.topic(), 0, null, null, null);
 
     @Before
     public void setUp() {
         restoreListener.setGlobalRestoreListener(stateRestoreListener);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws 
Exception {
-        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer(OffsetResetStrategy.EARLIEST) {
+    public void shouldRequestTopicsAndHandleTimeoutException() throws 
Exception {
+        final AtomicBoolean functionCalled = new AtomicBoolean(false);
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
             public Map<String, List<PartitionInfo>> listTopics() {
+                functionCalled.set(true);
                 throw new TimeoutException("KABOOM!");
             }
         };
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, new MockTime(), 0, stateRestoreListener);
-        try {
-            changelogReader.validatePartitionExists(topicPartition, "store");
-            fail("Should have thrown streams exception");
-        } catch (final StreamsException e) {
-            // pass
-        }
-    }
-
-    @Test(expected = StreamsException.class)
-    public void 
shouldThrowStreamsExceptionIfPartitionDoesntExistAfterMaxWait() throws 
Exception {
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void 
shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws 
Exception {
-        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                return Collections.singletonList(partitionInfo);
-            }
-        };
-
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, new
-            MockTime(), 10, stateRestoreListener);
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void 
shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws 
Exception {
-        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                throw new TimeoutException("KABOOM!");
-            }
-        };
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, new
-            MockTime(), 5, stateRestoreListener);
-        try {
-            changelogReader.validatePartitionExists(topicPartition, "store");
-            fail("Should have thrown streams exception");
-        } catch (final StreamsException e) {
-            // pass
-        }
-    }
-
-    @Test
-    public void shouldPassIfTopicPartitionExists() throws Exception {
-        consumer.updatePartitions(topicPartition.topic(), 
Collections.singletonList(partitionInfo));
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception {
-        final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<String, List<PartitionInfo>> listTopics() {
-                return Collections.emptyMap();
-            }
-        };
-
-        consumer.updatePartitions(topicPartition.topic(), 
Collections.singletonList(partitionInfo));
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, Time.SYSTEM, 5000, stateRestoreListener);
-        changelogReader.validatePartitionExists(topicPartition, "store");
+        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener);
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
+                "storeName"));
+        changelogReader.restore();
+        assertTrue(functionCalled.get());
     }
 
-
     @Test
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws 
Exception {
         consumer.subscribe(Collections.singleton("sometopic"));
@@ -158,7 +94,6 @@ public class StoreChangelogReaderTest {
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
-
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -191,7 +126,6 @@ public class StoreChangelogReaderTest {
         final StateRestorer restorer = new StateRestorer(topicPartition, 
restoreListener, null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
-
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -350,23 +284,22 @@ public class StoreChangelogReaderTest {
 
         setupConsumer(1, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
10L));
-        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
 
         assertTrue(changelogReader.restore().isEmpty());
 
+        addRecords(9, topicPartition, 1);
+
         final TopicPartition postInitialization = new TopicPartition("other", 
0);
+        setupConsumer(3, postInitialization);
         
consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 
0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 
3L));
 
         changelogReader.register(new StateRestorer(postInitialization, 
restoreListener2, null, Long.MAX_VALUE, false, "otherStore"));
 
-        addRecords(9, topicPartition, 1);
-
         final Collection<TopicPartition> expected = 
Utils.mkSet(topicPartition, postInitialization);
-
         consumer.assign(expected);
-        addRecords(3, postInitialization, 0);
+
         assertThat(changelogReader.restore(), equalTo(expected));
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackTwo.restored.size(), equalTo(3));

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a9d3cac..4246b17 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -34,7 +34,6 @@ import 
org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -115,7 +114,7 @@ public class StreamTaskTest {
     private final MockProducer<byte[], byte[]> producer = new 
MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000, 
stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener);
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
     private final String applicationId = "applicationId";

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index ea89494..fa22d7e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -188,7 +187,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, 
taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, 
Time.SYSTEM, 5000, new MockStateRestoreListener()),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, new 
MockStateRestoreListener()),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java 
b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 86c0eb5..54fd858 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -30,13 +30,8 @@ public class MockChangelogReader implements ChangelogReader {
     private final Set<TopicPartition> registered = new HashSet<>();
 
     @Override
-    public void validatePartitionExists(final TopicPartition topicPartition, 
final String storeName) {
-
-    }
-
-    @Override
-    public void register(final StateRestorer restorationInfo) {
-        registered.add(restorationInfo.partition());
+    public void register(final StateRestorer restorer) {
+        registered.add(restorer.partition());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index f0e245c..47124ed 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -222,8 +222,6 @@ public class ProcessorTopologyTestDriver {
                                   consumer,
                                   new StoreChangelogReader(
                                       
createRestoreConsumer(topology.storeToChangelogTopic()),
-                                      Time.SYSTEM,
-                                      5000,
                                       stateRestoreListener),
                                   config,
                                   streamsMetrics, stateDirectory,

Reply via email to