Repository: kafka Updated Branches: refs/heads/trunk 76c9a6dcb -> 6682abe4a
KAFKA-5797: Delay checking of partition existence in StoreChangelogReader 1. Remove timeout-based validatePartitionExists from StoreChangelogReader; instead only try to refresh metadata once after all tasks have been created and their topology initialized (hence all stores have been registered). 2. Add the logic to refresh partition metadata at the end of initialization if some restorers needing initialization cannot find their changelogs, hoping that in the next run loop these stores can find their changelogs. As a result, after `initialize` is called we may not be able to start initializing all the `needsInitializing` ones. As an optimization, we would not call `consumer#partitionsFor` any more, but only `consumer#listTopics` fetching all the topic metadata; so the only blocking calls left are `listTopics` and `endOffsets`, and we always capture timeout exceptions around these two calls, and delay to retry in the next run loop after refreshing the metadata. By doing this we can also reduce the number of request round trips between consumer and brokers. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax <matth...@confluent.io> Closes #3748 from guozhangwang/K5797-handle-metadata-available Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6682abe4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6682abe4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6682abe4 Branch: refs/heads/trunk Commit: 6682abe4ae68fdbf0eb362e45f43ea14e2aba847 Parents: 76c9a6d Author: Guozhang Wang <wangg...@gmail.com> Authored: Wed Aug 30 08:43:22 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Aug 30 08:43:22 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AssignedTasks.java | 2 +- .../processor/internals/ChangelogReader.java | 12 +- .../internals/ProcessorStateManager.java | 1 - .../processor/internals/StateRestorer.java | 10 +- .../internals/StoreChangelogReader.java | 154 ++++++++++--------- .../processor/internals/StreamThread.java | 2 - .../processor/internals/AbstractTaskTest.java | 3 +- .../processor/internals/StandbyTaskTest.java | 3 +- .../internals/StoreChangelogReaderTest.java | 99 ++---------- .../processor/internals/StreamTaskTest.java | 3 +- .../StreamThreadStateStoreProviderTest.java | 3 +- .../apache/kafka/test/MockChangelogReader.java | 9 +- .../kafka/test/ProcessorTopologyTestDriver.java | 2 - 13 files changed, 112 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index c0c9ccc..f09c48e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -124,7 +124,7 @@ class AssignedTasks { it.remove(); } catch (final LockException e) { // made this trace as it will spam the logs in the poll loop. - log.trace("{} Could not create {} {} due to {}; will retry", logPrefix, taskTypeName, entry.getKey(), e.getMessage()); + log.trace("{} Could not create {} {} due to {}; will retry in the next run loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index d8ed35e..5ebc34c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -28,18 +28,10 @@ import java.util.Map; */ public interface ChangelogReader { /** - * Validate that the partition exists on the cluster. - * @param topicPartition partition to validate. - * @param storeName name of the store the partition is for. - * @throws org.apache.kafka.streams.errors.StreamsException if partition doesn't exist - */ - void validatePartitionExists(final TopicPartition topicPartition, final String storeName); - - /** * Register a state store and it's partition for later restoration. - * @param restorationInfo + * @param restorer the state restorer to register */ - void register(final StateRestorer restorationInfo); + void register(final StateRestorer restorer); /** * Restore all registered state stores by reading from their changelogs. http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 978e24b..acd7674 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -157,7 +157,6 @@ public class ProcessorStateManager implements StateManager { } final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic)); - changelogReader.validatePartitionExists(storePartition, store.name()); if (isStandby) { if (store.persistent()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index ae68fd6..579561f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -29,11 +29,13 @@ public class StateRestorer { private final Long checkpoint; private final long offsetLimit; private final boolean persistent; - private final TopicPartition partition; private final String storeName; + private final TopicPartition partition; private final CompositeRestoreListener compositeRestoreListener; + private long restoredOffset; private long startingOffset; + private long endingOffset; StateRestorer(final TopicPartition partition, final CompositeRestoreListener compositeRestoreListener, @@ -57,7 +59,7 @@ public class StateRestorer { return checkpoint == null ? NO_CHECKPOINT : checkpoint; } - void restoreStarted(long startingOffset, long endingOffset) { + void restoreStarted() { compositeRestoreListener.onRestoreStart(partition, storeName, startingOffset, endingOffset); } @@ -89,6 +91,10 @@ public class StateRestorer { this.startingOffset = Math.min(offsetLimit, startingOffset); } + void setEndingOffset(final long endingOffset) { + this.endingOffset = Math.min(offsetLimit, endingOffset); + } + long startingOffset() { return startingOffset; } http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index e2cb3a2..57dff64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -22,9 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,62 +40,27 @@ import java.util.Set; public class StoreChangelogReader implements ChangelogReader { private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class); - private final Consumer<byte[], byte[]> consumer; private final String logPrefix; - private final Time time; - private final long partitionValidationTimeoutMs; - private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>(); + private final Consumer<byte[], byte[]> consumer; private final StateRestoreListener stateRestoreListener; + private final Map<TopicPartition, Long> endOffsets = new HashMap<>(); + private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>(); private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>(); private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>(); private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>(); - private final Map<TopicPartition, Long> endOffsets = new HashMap<>(); - public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, final Time time, - final long partitionValidationTimeoutMs, final StateRestoreListener stateRestoreListener) { - this.time = time; + public StoreChangelogReader(final String threadId, + final Consumer<byte[], byte[]> consumer, + final StateRestoreListener stateRestoreListener) { this.consumer = consumer; - this.partitionValidationTimeoutMs = partitionValidationTimeoutMs; this.logPrefix = String.format("stream-thread [%s]", threadId); this.stateRestoreListener = stateRestoreListener; } - public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time time, - long partitionValidationTimeoutMs, final StateRestoreListener stateRestoreListener) { - this("", consumer, time, partitionValidationTimeoutMs, stateRestoreListener); - } - - @Override - public void validatePartitionExists(final TopicPartition topicPartition, final String storeName) { - final long start = time.milliseconds(); - // fetch all info on all topics to avoid multiple remote calls - if (partitionInfo.isEmpty()) { - try { - partitionInfo.putAll(consumer.listTopics()); - } catch (final TimeoutException e) { - log.warn("{} Could not list topics so will fall back to partition by partition fetching", logPrefix); - } - } - - final long endTime = time.milliseconds() + partitionValidationTimeoutMs; - while (!hasPartition(topicPartition) && time.milliseconds() < endTime) { - try { - final List<PartitionInfo> partitions = consumer.partitionsFor(topicPartition.topic()); - if (partitions != null) { - partitionInfo.put(topicPartition.topic(), partitions); - } - } catch (final TimeoutException e) { - throw new StreamsException(String.format("Could not fetch partition info for topic: %s before expiration of the configured request timeout", - topicPartition.topic())); - } - } - - if (!hasPartition(topicPartition)) { - throw new StreamsException(String.format("Store %s's change log (%s) does not contain partition %s", - storeName, topicPartition.topic(), topicPartition.partition())); - } - log.debug("{} Took {}ms to validate that partition {} exists", logPrefix, time.milliseconds() - start, topicPartition); + public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, + final StateRestoreListener stateRestoreListener) { + this("", consumer, stateRestoreListener); } @Override @@ -106,7 +70,6 @@ public class StoreChangelogReader implements ChangelogReader { needsInitializing.put(restorer.partition(), restorer); } - public Collection<TopicPartition> restore() { if (!needsInitializing.isEmpty()) { initialize(); @@ -131,45 +94,79 @@ public class StoreChangelogReader implements ChangelogReader { } private void initialize() { - final Map<TopicPartition, StateRestorer> newTasksNeedingRestoration = new HashMap<>(); - if (!consumer.subscription().isEmpty()) { - throw new IllegalStateException(String.format("Restore consumer should have not subscribed to any partitions (%s) beforehand", consumer.subscription())); + throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")"); } - endOffsets.putAll(consumer.endOffsets(needsInitializing.keySet())); - - // remove any partitions where we already have all of the data - for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - Long offset = entry.getValue(); - final StateRestorer restorer = needsInitializing.get(topicPartition); - // might be null as has was initialized in a previous invocation. - if (restorer != null) { - if (restorer.checkpoint() >= offset) { + + // first refresh the changelog partition information from brokers, since initialize is only called when + // the needsInitializing map is not empty, meaning we do not know the metadata for some of them yet + refreshChangelogInfo(); + + Map<TopicPartition, StateRestorer> initializable = new HashMap<>(); + for (Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + if (hasPartition(topicPartition)) { + initializable.put(entry.getKey(), entry.getValue()); + } + } + + // try to fetch end offsets for the initializable restorers and remove any partitions + // where we already have all of the data + try { + endOffsets.putAll(consumer.endOffsets(initializable.keySet())); + } catch (final TimeoutException e) { + // if timeout exception gets thrown we just give up this time and retry in the next run loop + log.debug("{} Could not fetch end offset for {}; will fall back to partition by partition fetching", logPrefix, initializable); + return; + } + + final Iterator<TopicPartition> iter = initializable.keySet().iterator(); + while (iter.hasNext()) { + final TopicPartition topicPartition = iter.next(); + final Long endOffset = endOffsets.get(topicPartition); + + // offset should not be null; but since the consumer API does not guarantee it + // we add this check just in case + if (endOffset != null) { + final StateRestorer restorer = needsInitializing.get(topicPartition); + if (restorer.checkpoint() >= endOffset) { restorer.setRestoredOffset(restorer.checkpoint()); - } else if (restorer.offsetLimit() == 0 || endOffsets.get(topicPartition) == 0) { + iter.remove(); + } else if (restorer.offsetLimit() == 0 || endOffset == 0) { restorer.setRestoredOffset(0); + iter.remove(); } else { - newTasksNeedingRestoration.put(topicPartition, restorer); - final Long endOffset = endOffsets.get(topicPartition); - restorer.restoreStarted(restorer.startingOffset(), endOffset); + restorer.setEndingOffset(endOffset); } + needsInitializing.remove(topicPartition); + } else { + log.info("{} End offset cannot be found form the returned metadata; removing this partition from the current run loop", logPrefix); + iter.remove(); } } - log.debug("{} Starting restoring state stores from changelog topics {}", logPrefix, newTasksNeedingRestoration.keySet()); + // set up restorer for those initializable + if (!initializable.isEmpty()) { + startRestoration(initializable); + } + } + + private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) { + log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet()); final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment()); - assignment.addAll(newTasksNeedingRestoration.keySet()); + assignment.addAll(initialized.keySet()); consumer.assign(assignment); + final List<StateRestorer> needsPositionUpdate = new ArrayList<>(); - for (final StateRestorer restorer : newTasksNeedingRestoration.values()) { + for (final StateRestorer restorer : initialized.values()) { if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) { consumer.seek(restorer.partition(), restorer.checkpoint()); logRestoreOffsets(restorer.partition(), - restorer.checkpoint(), - endOffsets.get(restorer.partition())); + restorer.checkpoint(), + endOffsets.get(restorer.partition())); restorer.setStartingOffset(consumer.position(restorer.partition())); + restorer.restoreStarted(); } else { consumer.seekToBeginning(Collections.singletonList(restorer.partition())); needsPositionUpdate.add(restorer); @@ -178,14 +175,14 @@ public class StoreChangelogReader implements ChangelogReader { for (final StateRestorer restorer : needsPositionUpdate) { final long position = consumer.position(restorer.partition()); - restorer.setStartingOffset(position); logRestoreOffsets(restorer.partition(), - position, - endOffsets.get(restorer.partition())); + position, + endOffsets.get(restorer.partition())); + restorer.setStartingOffset(position); + restorer.restoreStarted(); } - needsRestoring.putAll(newTasksNeedingRestoration); - needsInitializing.clear(); + needsRestoring.putAll(initialized); } private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) { @@ -203,6 +200,14 @@ public class StoreChangelogReader implements ChangelogReader { return completed; } + private void refreshChangelogInfo() { + try { + partitionInfo.putAll(consumer.listTopics()); + } catch (final TimeoutException e) { + log.debug("{} Could not fetch topic metadata within the timeout, will retry in the next run loop", logPrefix); + } + } + @Override public Map<TopicPartition, Long> restoredOffsets() { final Map<TopicPartition, Long> restoredOffsets = new HashMap<>(); @@ -294,6 +299,5 @@ public class StoreChangelogReader implements ChangelogReader { } return false; - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a848172..d978f3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -638,8 +638,6 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId, restoreConsumer, - time, - config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), stateRestoreListener); Producer<byte[], byte[]> threadProducer = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index d6709b8..43fe24f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -122,7 +121,7 @@ public class AbstractTaskTest { Collections.<String, String>emptyMap(), Collections.<StateStore>emptyList()), consumer, - new StoreChangelogReader(consumer, Time.SYSTEM, 5000, new MockStateRestoreListener()), + new StoreChangelogReader(consumer, new MockStateRestoreListener()), false, stateDirectory, config) { http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index f22e773..40a66da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -133,7 +132,7 @@ public class StandbyTaskTest { private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000, stateRestoreListener); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 8ac9a62..a480ec1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -22,11 +22,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.MockStateRestoreListener; @@ -38,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; @@ -53,94 +51,32 @@ public class StoreChangelogReaderTest { private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback); private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0, stateRestoreListener); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener); private final TopicPartition topicPartition = new TopicPartition("topic", 0); - private final PartitionInfo partitionInfo = new PartitionInfo(topicPartition.topic(), 0, null, null, null); @Before public void setUp() { restoreListener.setGlobalRestoreListener(stateRestoreListener); } - @SuppressWarnings("unchecked") @Test - public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception { - final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + public void shouldRequestTopicsAndHandleTimeoutException() throws Exception { + final AtomicBoolean functionCalled = new AtomicBoolean(false); + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { @Override public Map<String, List<PartitionInfo>> listTopics() { + functionCalled.set(true); throw new TimeoutException("KABOOM!"); } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0, stateRestoreListener); - try { - changelogReader.validatePartitionExists(topicPartition, "store"); - fail("Should have thrown streams exception"); - } catch (final StreamsException e) { - // pass - } - } - - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfPartitionDoesntExistAfterMaxWait() throws Exception { - changelogReader.validatePartitionExists(topicPartition, "store"); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception { - final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public List<PartitionInfo> partitionsFor(final String topic) { - return Collections.singletonList(partitionInfo); - } - }; - - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new - MockTime(), 10, stateRestoreListener); - changelogReader.validatePartitionExists(topicPartition, "store"); - } - @SuppressWarnings("unchecked") - @Test - public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception { - final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public List<PartitionInfo> partitionsFor(final String topic) { - throw new TimeoutException("KABOOM!"); - } - }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new - MockTime(), 5, stateRestoreListener); - try { - changelogReader.validatePartitionExists(topicPartition, "store"); - fail("Should have thrown streams exception"); - } catch (final StreamsException e) { - // pass - } - } - - @Test - public void shouldPassIfTopicPartitionExists() throws Exception { - consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo)); - changelogReader.validatePartitionExists(topicPartition, "store"); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception { - final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public Map<String, List<PartitionInfo>> listTopics() { - return Collections.emptyMap(); - } - }; - - consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo)); - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000, stateRestoreListener); - changelogReader.validatePartitionExists(topicPartition, "store"); + final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, + "storeName")); + changelogReader.restore(); + assertTrue(functionCalled.get()); } - @Test public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception { consumer.subscribe(Collections.singleton("sometopic")); @@ -158,7 +94,6 @@ public class StoreChangelogReaderTest { setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - changelogReader.restore(); assertThat(callback.restored.size(), equalTo(messages)); } @@ -191,7 +126,6 @@ public class StoreChangelogReaderTest { final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true, "storeName"); changelogReader.register(restorer); - changelogReader.restore(); assertThat(callback.restored.size(), equalTo(3)); assertThat(restorer.restoredOffset(), equalTo(3L)); @@ -350,23 +284,22 @@ public class StoreChangelogReaderTest { setupConsumer(1, topicPartition); consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L)); - changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, - "storeName")); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); assertTrue(changelogReader.restore().isEmpty()); + addRecords(9, topicPartition, 1); + final TopicPartition postInitialization = new TopicPartition("other", 0); + setupConsumer(3, postInitialization); consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L)); consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L)); changelogReader.register(new StateRestorer(postInitialization, restoreListener2, null, Long.MAX_VALUE, false, "otherStore")); - addRecords(9, topicPartition, 1); - final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization); - consumer.assign(expected); - addRecords(3, postInitialization, 0); + assertThat(changelogReader.restore(), equalTo(expected)); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackTwo.restored.size(), equalTo(3)); http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index a9d3cac..4246b17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; @@ -115,7 +114,7 @@ public class StreamTaskTest { private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer); private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000, stateRestoreListener); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); private final String applicationId = "applicationId"; http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index ea89494..fa22d7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -188,7 +187,7 @@ public class StreamThreadStateStoreProviderTest { Collections.singletonList(new TopicPartition(topicName, taskId.partition)), topology, clientSupplier.consumer, - new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000, new MockStateRestoreListener()), + new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener()), streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory, http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java index 86c0eb5..54fd858 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java @@ -30,13 +30,8 @@ public class MockChangelogReader implements ChangelogReader { private final Set<TopicPartition> registered = new HashSet<>(); @Override - public void validatePartitionExists(final TopicPartition topicPartition, final String storeName) { - - } - - @Override - public void register(final StateRestorer restorationInfo) { - registered.add(restorationInfo.partition()); + public void register(final StateRestorer restorer) { + registered.add(restorer.partition()); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index f0e245c..47124ed 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -222,8 +222,6 @@ public class ProcessorTopologyTestDriver { consumer, new StoreChangelogReader( createRestoreConsumer(topology.storeToChangelogTopic()), - Time.SYSTEM, - 5000, stateRestoreListener), config, streamsMetrics, stateDirectory,