KAFKA-6121: Restore and global consumer should not use auto.offset.reset - set auto.offset.reste to "none" for restore and global consumer - handle InvalidOffsetException for restore and global consumer - add corresponding tests - some minor cleanup
Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Damian Guy <damian....@gmail.com, Bill Bejeck <b...@confluent.io>, GuozhangWang <wangg...@gmail.com> Closes #4215 from mjsax/kafka-6121-restore-global-consumer-handle-reset Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04395175 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04395175 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04395175 Branch: refs/heads/trunk Commit: 043951753b6fb6c8bae6d25d7a6a97e74b614cac Parents: 2bf2348 Author: Matthias J. Sax <matth...@confluent.io> Authored: Mon Dec 11 14:20:10 2017 +0000 Committer: Damian Guy <damian....@gmail.com> Committed: Mon Dec 11 14:20:10 2017 +0000 ---------------------------------------------------------------------- .../kafka/clients/consumer/MockConsumer.java | 8 + .../org/apache/kafka/streams/StreamsConfig.java | 1 + .../internals/AbstractProcessorContext.java | 5 + .../internals/AbstractStateManager.java | 114 ++++++++++ .../processor/internals/AbstractTask.java | 6 +- .../processor/internals/GlobalStateManager.java | 5 +- .../internals/GlobalStateManagerImpl.java | 90 +++++--- .../internals/GlobalStateUpdateTask.java | 4 +- .../processor/internals/GlobalStreamThread.java | 38 ++-- .../internals/InternalProcessorContext.java | 7 +- .../internals/ProcessorStateManager.java | 70 +++--- .../processor/internals/StandbyTask.java | 6 +- .../processor/internals/StateManager.java | 6 +- .../internals/StoreChangelogReader.java | 21 +- .../streams/processor/internals/StreamTask.java | 6 - .../processor/internals/StreamThread.java | 33 ++- .../internals/InnerMeteredKeyValueStore.java | 1 - .../internals/MeteredKeyValueBytesStore.java | 2 - .../apache/kafka/streams/StreamsConfigTest.java | 6 - .../processor/internals/AbstractTaskTest.java | 147 +++++++++++-- .../internals/GlobalStateManagerImplTest.java | 216 +++++++++++++------ .../internals/GlobalStreamThreadTest.java | 111 ++++++++-- .../processor/internals/StateConsumerTest.java | 2 +- .../processor/internals/StateManagerStub.java | 8 +- .../internals/StoreChangelogReaderTest.java | 27 +++ .../processor/internals/StreamTaskTest.java | 43 ++-- .../processor/internals/StreamThreadTest.java | 101 ++++++++- .../kafka/test/GlobalStateManagerStub.java | 17 +- .../apache/kafka/test/NoOpReadOnlyStore.java | 17 +- .../kafka/test/ProcessorTopologyTestDriver.java | 11 +- 30 files changed, 862 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 9b0c058..10aedbb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -108,6 +108,14 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } ensureNotClosed(); this.subscriptions.subscribeFromPattern(topicsToSubscribe); + final Set<TopicPartition> assignedPartitions = new HashSet<>(); + for (final String topic : topicsToSubscribe) { + for (final PartitionInfo info : this.partitions.get(topic)) { + assignedPartitions.add(new TopicPartition(topic, info.partition())); + } + + } + subscriptions.assignFromSubscribed(assignedPartitions); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 49b8a3c..d78fc0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -763,6 +763,7 @@ public class StreamsConfig extends AbstractConfig { consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG); // add client id with stream client id prefix consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); return consumerProps; } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index aa58226..87408c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -197,4 +197,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte public void initialized() { initialized = true; } + + @Override + public void uninitialize() { + initialized = false; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java new file mode 100644 index 0000000..777e46b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +abstract class AbstractStateManager implements StateManager { + static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + + final File baseDir; + final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(); + + OffsetCheckpoint checkpoint; + + final Map<String, StateStore> stores = new LinkedHashMap<>(); + final Map<String, StateStore> globalStores = new LinkedHashMap<>(); + + AbstractStateManager(final File baseDir) { + this.baseDir = baseDir; + this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + + } + + public void reinitializeStateStoresForPartitions(final Logger log, + final Map<String, StateStore> stateStores, + final Map<String, String> storeToChangelogTopic, + final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext) { + final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic); + final Set<String> storeToBeReinitialized = new HashSet<>(); + + for (final TopicPartition topicPartition : partitions) { + checkpointableOffsets.remove(topicPartition); + storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic())); + } + try { + checkpoint.write(checkpointableOffsets); + } catch (final IOException fatalException) { + log.error("Failed to update checkpoint file for global stores.", fatalException); + throw new StreamsException("Failed to reinitialize global store.", fatalException); + } + + final Iterator<Map.Entry<String, StateStore>> it = stateStores.entrySet().iterator(); + while (it.hasNext()) { + final StateStore stateStore = it.next().getValue(); + final String storeName = stateStore.name(); + if (storeToBeReinitialized.contains(storeName)) { + try { + stateStore.close(); + } catch (final RuntimeException ignoreAndSwallow) { /* ignore */ } + processorContext.uninitialize(); + it.remove(); + + // TODO remove this eventually + // -> (only after we are sure, we don't need it for backward compatibility reasons anymore; maybe 2.0 release?) + // this is an ugly "hack" that is required because RocksDBStore does not follow the pattern to put the + // store directory as <taskDir>/<storeName> but nests it with an intermediate <taskDir>/rocksdb/<storeName> + try { + Utils.delete(new File(baseDir + File.separator + "rocksdb" + File.separator + storeName)); + } catch (final IOException fatalException) { + log.error("Failed to reinitialize store {}.", storeName, fatalException); + throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); + } + + try { + Utils.delete(new File(baseDir + File.separator + storeName)); + } catch (final IOException fatalException) { + log.error("Failed to reinitialize store {}.", storeName, fatalException); + throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); + } + + stateStore.init(processorContext, stateStore); + } + } + } + + private Map<String, String> inverseOneToOneMap(final Map<String, String> origin) { + final Map<String, String> reversedMap = new HashMap<>(); + for (final Map.Entry<String, String> entry : origin.entrySet()) { + reversedMap.put(entry.getValue(), entry.getKey()); + } + return reversedMap; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index d11af3a..2b8af6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -52,7 +52,7 @@ public abstract class AbstractTask implements Task { final Logger log; final LogContext logContext; boolean taskInitialized; - private final StateDirectory stateDirectory; + final StateDirectory stateDirectory; InternalProcessorContext processorContext; @@ -220,10 +220,14 @@ public abstract class AbstractTask implements Task { for (final StateStore store : topology.stateStores()) { log.trace("Initializing store {}", store.name()); + processorContext.uninitialize(); store.init(processorContext, store); } } + void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions) { + stateMgr.reinitializeStateStoresForPartitions(partitions, processorContext); + } /** * @throws ProcessorStateException if there is an error while closing the state manager http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java index c9b8ca8..479fd1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java @@ -21,9 +21,12 @@ import org.apache.kafka.streams.errors.StreamsException; import java.util.Set; public interface GlobalStateManager extends StateManager { + + void setGlobalProcessorContext(final InternalProcessorContext processorContext); + /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - Set<String> initialize(InternalProcessorContext processorContext); + Set<String> initialize(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index bbae9aa..2d4ee8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -33,38 +34,30 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.slf4j.Logger; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.CHECKPOINT_FILE_NAME; - /** * This class is responsible for the initialization, restoration, closing, flushing etc * of Global State Stores. There is only ever 1 instance of this class per Application Instance. */ -public class GlobalStateManagerImpl implements GlobalStateManager { +public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager { private final Logger log; - private final ProcessorTopology topology; private final Consumer<byte[], byte[]> globalConsumer; private final StateDirectory stateDirectory; - private final Map<String, StateStore> stores = new LinkedHashMap<>(); - private final File baseDir; - private final OffsetCheckpoint checkpoint; private final Set<String> globalStoreNames = new HashSet<>(); - private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(); private final StateRestoreListener stateRestoreListener; + private InternalProcessorContext processorContext; private final int retries; private final long retryBackoffMs; @@ -74,19 +67,24 @@ public class GlobalStateManagerImpl implements GlobalStateManager { final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { + super(stateDirectory.globalStateDir()); + this.log = logContext.logger(GlobalStateManagerImpl.class); this.topology = topology; this.globalConsumer = globalConsumer; this.stateDirectory = stateDirectory; - this.baseDir = stateDirectory.globalStateDir(); - this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); this.stateRestoreListener = stateRestoreListener; this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG); this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); } @Override - public Set<String> initialize(final InternalProcessorContext processorContext) { + public void setGlobalProcessorContext(final InternalProcessorContext processorContext) { + this.processorContext = processorContext; + } + + @Override + public Set<String> initialize() { try { if (!stateDirectory.lockGlobalState()) { throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir)); @@ -103,7 +101,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } catch (IOException e1) { log.error("Failed to unlock the global state directory", e); } - throw new StreamsException("Failed to read checkpoints for global state stores", e); + throw new StreamsException("Failed to read checkpoints for global state globalStores", e); } final List<StateStore> stateStores = topology.globalStateStores(); @@ -115,8 +113,22 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } @Override + public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext) { + super.reinitializeStateStoresForPartitions( + log, + globalStores, + topology.storeToChangelogTopic(), + partitions, + processorContext); + + globalConsumer.assign(partitions); + globalConsumer.seekToBeginning(partitions); + } + + @Override public StateStore getGlobalStore(final String name) { - return stores.get(name); + return globalStores.get(name); } @Override @@ -131,7 +143,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { - if (stores.containsKey(store.name())) { + if (globalStores.containsKey(store.name())) { throw new IllegalArgumentException(String.format("Global Store %s has already been registered", store.name())); } @@ -173,7 +185,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } try { restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name()); - stores.put(store.name(), store); + globalStores.put(store.name(), store); } finally { globalConsumer.unsubscribe(); } @@ -249,17 +261,27 @@ public class GlobalStateManagerImpl implements GlobalStateManager { long restoreCount = 0L; while (offset < highWatermark) { - final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100); - final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(); - for (ConsumerRecord<byte[], byte[]> record : records) { - if (record.key() != null) { - restoreRecords.add(KeyValue.pair(record.key(), record.value())); + try { + final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100); + final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(); + for (ConsumerRecord<byte[], byte[]> record : records) { + if (record.key() != null) { + restoreRecords.add(KeyValue.pair(record.key(), record.value())); + } + offset = globalConsumer.position(topicPartition); } - offset = globalConsumer.position(topicPartition); + stateRestoreAdapter.restoreAll(restoreRecords); + stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); + restoreCount += restoreRecords.size(); + } catch (final InvalidOffsetException recoverableException) { + log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.", + storeName, + recoverableException.getMessage()); + reinitializeStateStoresForPartitions(recoverableException.partitions(), processorContext); + + stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); + restoreCount = 0L; } - stateRestoreAdapter.restoreAll(restoreRecords); - stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); - restoreCount += restoreRecords.size(); } stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount); checkpointableOffsets.put(topicPartition, offset); @@ -268,8 +290,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @Override public void flush() { - log.debug("Flushing all global stores registered in the state manager"); - for (StateStore store : this.stores.values()) { + log.debug("Flushing all global globalStores registered in the state manager"); + for (StateStore store : this.globalStores.values()) { try { log.trace("Flushing global store={}", store.name()); store.flush(); @@ -283,11 +305,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @Override public void close(final Map<TopicPartition, Long> offsets) throws IOException { try { - if (stores.isEmpty()) { + if (globalStores.isEmpty()) { return; } final StringBuilder closeFailed = new StringBuilder(); - for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { + for (final Map.Entry<String, StateStore> entry : globalStores.entrySet()) { log.debug("Closing global storage engine {}", entry.getKey()); try { entry.getValue().close(); @@ -300,9 +322,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager { .append("\n"); } } - stores.clear(); + globalStores.clear(); if (closeFailed.length() > 0) { - throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed); + throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + closeFailed); } checkpoint(offsets); } finally { @@ -317,7 +339,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { try { checkpoint.write(checkpointableOffsets); } catch (IOException e) { - log.warn("Failed to write offsets checkpoint for global stores", e); + log.warn("Failed to write offsets checkpoint for global globalStores", e); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 849af57..c18f3c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -56,8 +56,9 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ + @Override public Map<TopicPartition, Long> initialize() { - final Set<String> storeNames = stateMgr.initialize(processorContext); + final Set<String> storeNames = stateMgr.initialize(); final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic(); for (final String storeName : storeNames) { final String sourceTopic = storeNameToTopic.get(storeName); @@ -69,7 +70,6 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { return stateMgr.checkpointed(); } - @SuppressWarnings("unchecked") @Override public void update(final ConsumerRecord<byte[], byte[]> record) { http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 9d202d1..1cc5c85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; @@ -234,14 +235,20 @@ public class GlobalStreamThread extends Thread { } void pollAndUpdate() { - final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs); - for (ConsumerRecord<byte[], byte[]> record : received) { - stateMaintainer.update(record); - } - final long now = time.milliseconds(); - if (flushInterval >= 0 && now >= lastFlush + flushInterval) { - stateMaintainer.flushState(); - lastFlush = now; + try { + final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs); + for (ConsumerRecord<byte[], byte[]> record : received) { + stateMaintainer.update(record); + } + final long now = time.milliseconds(); + if (flushInterval >= 0 && now >= lastFlush + flushInterval) { + stateMaintainer.flushState(); + lastFlush = now; + } + } catch (final InvalidOffsetException recoverableException) { + log.error("Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException); + throw new StreamsException("Updating global state failed. " + + "You can restart KafkaStreams to recover from this error.", recoverableException); } } @@ -308,15 +315,19 @@ public class GlobalStreamThread extends Thread { stateDirectory, stateRestoreListener, config); + + final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl( + config, + stateMgr, + streamsMetrics, + cache); + stateMgr.setGlobalProcessorContext(globalProcessorContext); + final StateConsumer stateConsumer = new StateConsumer(this.logContext, globalConsumer, new GlobalStateUpdateTask(topology, - new GlobalProcessorContextImpl( - config, - stateMgr, - streamsMetrics, - cache), + globalProcessorContext, stateMgr, config.defaultDeserializationExceptionHandler(), logContext), @@ -324,6 +335,7 @@ public class GlobalStreamThread extends Thread { config.getLong(StreamsConfig.POLL_MS_CONFIG), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); stateConsumer.initialize(); + return stateConsumer; } catch (final LockException fatalException) { final String errorMsg = "Could not lock global state directory. This could happen if multiple KafkaStreams " + http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 57bb3ac..25df826 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -50,7 +50,12 @@ public interface InternalProcessorContext extends ProcessorContext { ThreadCache getCache(); /** - * Mark this contex as being initialized + * Mark this context as being initialized */ void initialized(); + + /** + * Mark this context as being uninitialized + */ + void uninitialize(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3a2803e..1ee0e14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -33,27 +33,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -public class ProcessorStateManager implements StateManager { - +public class ProcessorStateManager extends AbstractStateManager { private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; - static final String CHECKPOINT_FILE_NAME = ".checkpoint"; private final Logger log; - private final File baseDir; private final TaskId taskId; private final String logPrefix; private final boolean isStandby; private final ChangelogReader changelogReader; - private final Map<String, StateStore> stores; - private final Map<String, StateStore> globalStores; private final Map<TopicPartition, Long> offsetLimits; private final Map<TopicPartition, Long> restoredOffsets; - private final Map<TopicPartition, Long> checkpointedOffsets; private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name private final Map<String, String> storeToChangelogTopic; private final List<TopicPartition> changelogPartitions = new ArrayList<>(); @@ -61,7 +54,6 @@ public class ProcessorStateManager implements StateManager { // TODO: this map does not work with customized grouper where multiple partitions // of the same topic can be assigned to the same topic. private final Map<String, TopicPartition> partitionForTopic; - private OffsetCheckpoint checkpoint; /** * @throws ProcessorStateException if the task directory does not exist and could not be created @@ -75,28 +67,25 @@ public class ProcessorStateManager implements StateManager { final ChangelogReader changelogReader, final boolean eosEnabled, final LogContext logContext) throws IOException { + super(stateDirectory.directoryForTask(taskId)); + + this.log = logContext.logger(ProcessorStateManager.class); this.taskId = taskId; this.changelogReader = changelogReader; logPrefix = String.format("task [%s] ", taskId); - this.log = logContext.logger(getClass()); partitionForTopic = new HashMap<>(); for (final TopicPartition source : sources) { partitionForTopic.put(source.topic(), source); } - stores = new LinkedHashMap<>(); - globalStores = new HashMap<>(); offsetLimits = new HashMap<>(); restoredOffsets = new HashMap<>(); this.isStandby = isStandby; restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null; this.storeToChangelogTopic = storeToChangelogTopic; - baseDir = stateDirectory.directoryForTask(taskId); - // load the checkpoint information - checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); - checkpointedOffsets = new HashMap<>(checkpoint.read()); + checkpointableOffsets.putAll(checkpoint.read()); if (eosEnabled) { // delete the checkpoint file after finish loading its stored offsets @@ -120,42 +109,55 @@ public class ProcessorStateManager implements StateManager { @Override public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { - log.debug("Registering state store {} to its state manager", store.name()); + final String storeName = store.name(); + log.debug("Registering state store {} to its state manager", storeName); - if (store.name().equals(CHECKPOINT_FILE_NAME)) { + if (CHECKPOINT_FILE_NAME.equals(storeName)) { throw new IllegalArgumentException(String.format("%sIllegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME)); } - if (stores.containsKey(store.name())) { - throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", logPrefix, store.name())); + if (stores.containsKey(storeName)) { + throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", logPrefix, storeName)); } // check that the underlying change log topic exist or not - final String topic = storeToChangelogTopic.get(store.name()); + final String topic = storeToChangelogTopic.get(storeName); if (topic == null) { - stores.put(store.name(), store); + stores.put(storeName, store); return; } final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic)); if (isStandby) { - log.trace("Preparing standby replica of state store {} with changelog topic {}", store.name(), topic); + log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", storeName, topic); restoreCallbacks.put(topic, stateRestoreCallback); + } else { - log.trace("Restoring state store {} from changelog topic {}", store.name(), topic); + log.trace("Restoring state store {} from changelog topic {}", storeName, topic); final StateRestorer restorer = new StateRestorer(storePartition, new CompositeRestoreListener(stateRestoreCallback), - checkpointedOffsets.get(storePartition), + checkpointableOffsets.get(storePartition), offsetLimit(storePartition), store.persistent(), - store.name()); + storeName); changelogReader.register(restorer); } changelogPartitions.add(storePartition); - stores.put(store.name(), store); + stores.put(storeName, store); + } + + @Override + public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext) { + super.reinitializeStateStoresForPartitions( + log, + stores, + storeToChangelogTopic, + partitions, + processorContext); } @Override @@ -167,8 +169,8 @@ public class ProcessorStateManager implements StateManager { final int partition = getPartition(topicName); final TopicPartition storePartition = new TopicPartition(topicName, partition); - if (checkpointedOffsets.containsKey(storePartition)) { - partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition)); + if (checkpointableOffsets.containsKey(storePartition)) { + partitionsAndOffsets.put(storePartition, checkpointableOffsets.get(storePartition)); } else { partitionsAndOffsets.put(storePartition, -1L); } @@ -281,7 +283,7 @@ public class ProcessorStateManager implements StateManager { if (ackedOffsets != null) { checkpoint(ackedOffsets); } - + stores.clear(); } if (firstException != null) { @@ -293,7 +295,7 @@ public class ProcessorStateManager implements StateManager { @Override public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { log.trace("Writing checkpoint: {}", ackedOffsets); - checkpointedOffsets.putAll(changelogReader.restoredOffsets()); + checkpointableOffsets.putAll(changelogReader.restoredOffsets()); for (final StateStore store : stores.values()) { final String storeName = store.name(); // only checkpoint the offset to the offsets file if @@ -303,9 +305,9 @@ public class ProcessorStateManager implements StateManager { final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); if (ackedOffsets.containsKey(topicPartition)) { // store the last offset + 1 (the log position after restoration) - checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); + checkpointableOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); } else if (restoredOffsets.containsKey(topicPartition)) { - checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); + checkpointableOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); } } } @@ -314,7 +316,7 @@ public class ProcessorStateManager implements StateManager { if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } - checkpoint.write(checkpointedOffsets); + checkpoint.write(checkpointableOffsets); } catch (final IOException e) { log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME), e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 73fbf63..837f607 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -131,8 +131,10 @@ public class StandbyTask extends AbstractTask { log.debug("Closing"); boolean committedSuccessfully = false; try { - commit(); - committedSuccessfully = true; + if (clean) { + commit(); + committedSuccessfully = true; + } } finally { closeStateManager(committedSuccessfully); } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 2a8d9a3..f6efde6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Map; interface StateManager extends Checkpointable { @@ -37,7 +38,10 @@ interface StateManager extends Checkpointable { void flush(); - void close(Map<TopicPartition, Long> offsets) throws IOException; + void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext); + + void close(final Map<TopicPartition, Long> offsets) throws IOException; StateStore getGlobalStore(final String name); http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 83c783d..178d2bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -78,12 +79,24 @@ public class StoreChangelogReader implements ChangelogReader { return completed(); } - final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet()); - final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10); - for (final TopicPartition partition : partitions) { - restorePartition(allRecords, partition, active.restoringTaskFor(partition)); + final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet()); + try { + final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10); + for (final TopicPartition partition : restoringPartitions) { + restorePartition(allRecords, partition, active.restoringTaskFor(partition)); + } + } catch (final InvalidOffsetException recoverableException) { + log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException); + final Set<TopicPartition> partitions = recoverableException.partitions(); + for (final TopicPartition partition : partitions) { + final StreamTask task = active.restoringTaskFor(partition); + log.info("Reinitializing StreamTask {}", task); + task.reinitializeStateStoresForPartitions(recoverableException.partitions()); + } + restoreConsumer.seekToBeginning(partitions); } + if (needsRestoring.isEmpty()) { restoreConsumer.unsubscribe(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 55456d0..f2fa448 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -33,7 +33,6 @@ import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; @@ -641,11 +640,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } // visible for testing only - ProcessorContext processorContext() { - return processorContext; - } - - // visible for testing only RecordCollector recordCollector() { return recordCollector; } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a9786f9..696081d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1036,22 +1036,33 @@ public class StreamThread extends Thread { processStandbyRecords = false; } - final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0); + try { + final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0); - if (!records.isEmpty()) { - for (final TopicPartition partition : records.partitions()) { - final StandbyTask task = taskManager.standbyTask(partition); + if (!records.isEmpty()) { + for (final TopicPartition partition : records.partitions()) { + final StandbyTask task = taskManager.standbyTask(partition); - if (task == null) { - throw new StreamsException(logPrefix + "Missing standby task for partition " + partition); - } + if (task == null) { + throw new StreamsException(logPrefix + "Missing standby task for partition " + partition); + } - final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); - if (remaining != null) { - restoreConsumer.pause(singleton(partition)); - standbyRecords.put(partition, remaining); + final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); + if (remaining != null) { + restoreConsumer.pause(singleton(partition)); + standbyRecords.put(partition, remaining); + } } } + } catch (final InvalidOffsetException recoverableException) { + log.warn("Updating StandbyTasks failed. Deleting StandbyTasks stores to recreate from scratch.", recoverableException); + final Set<TopicPartition> partitions = recoverableException.partitions(); + for (final TopicPartition partition : partitions) { + final StandbyTask task = taskManager.standbyTask(partition); + log.info("Reinitializing StandbyTask {}", task); + task.reinitializeStateStoresForPartitions(recoverableException.partitions()); + } + restoreConsumer.seekToBeginning(partitions); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java index 5ff8a26..a34851a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java @@ -159,7 +159,6 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract } else { inner.init(InnerMeteredKeyValueStore.this.context, InnerMeteredKeyValueStore.this.root); } - } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java index a6ff8d5..35647b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java @@ -102,8 +102,6 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); innerMetered.init(context, root); - - } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a06f7e8..12db711 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -190,11 +190,9 @@ public class StreamsConfigTest { @Test public void shouldSupportPrefixedRestoreConsumerConfigs() { - props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); - assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -245,11 +243,9 @@ public class StreamsConfigTest { @Test public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() { - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId"); - assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -305,11 +301,9 @@ public class StreamsConfigTest { @Test public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { - props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); - assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index e75b54f..776110c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -25,52 +25,69 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.test.MockProcessorContext; +import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class AbstractTaskTest { private final TaskId id = new TaskId(0, 0); private StateDirectory stateDirectory = EasyMock.createMock(StateDirectory.class); + private final TopicPartition storeTopicPartition1 = new TopicPartition("t1", 0); + private final TopicPartition storeTopicPartition2 = new TopicPartition("t2", 0); + private final TopicPartition storeTopicPartition3 = new TopicPartition("t3", 0); + private final TopicPartition storeTopicPartition4 = new TopicPartition("t4", 0); + private final Collection<TopicPartition> storeTopicPartitions + = Utils.mkSet(storeTopicPartition1, storeTopicPartition2, storeTopicPartition3, storeTopicPartition4); @Before public void before() { - EasyMock.expect(stateDirectory.directoryForTask(id)).andReturn(TestUtils.tempDirectory()); + expect(stateDirectory.directoryForTask(id)).andReturn(TestUtils.tempDirectory()); } @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() { final Consumer consumer = mockConsumer(new AuthorizationException("blah")); - final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList()); + final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap()); task.updateOffsetLimits(); } @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() { final Consumer consumer = mockConsumer(new KafkaException("blah")); - final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList()); + final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap()); task.updateOffsetLimits(); } @Test(expected = WakeupException.class) public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { final Consumer consumer = mockConsumer(new WakeupException()); - final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList()); + final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap()); task.updateOffsetLimits(); } @@ -78,10 +95,10 @@ public class AbstractTaskTest { public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final StateStore store = EasyMock.createNiceMock(StateStore.class); - EasyMock.expect(stateDirectory.lock(id)).andReturn(false); + expect(stateDirectory.lock(id)).andReturn(false); EasyMock.replay(stateDirectory); - final AbstractTask task = createTask(consumer, Collections.singletonList(store)); + final AbstractTask task = createTask(consumer, Collections.singletonMap(store, "dummy")); try { task.initializeStateStores(); @@ -93,11 +110,11 @@ public class AbstractTaskTest { } @Test - public void shouldNotAttemptToLockIfNoStores() throws IOException { + public void shouldNotAttemptToLockIfNoStores() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); EasyMock.replay(stateDirectory); - final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList()); + final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap()); task.initializeStateStores(); @@ -105,20 +122,122 @@ public class AbstractTaskTest { EasyMock.verify(stateDirectory); } + @Test + public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOException { + final StreamsConfig streamsConfig = new StreamsConfig(new Properties() { + { + put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); + put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + } + }); + final Consumer consumer = EasyMock.createNiceMock(Consumer.class); + + final StateStore store1 = EasyMock.createNiceMock(StateStore.class); + final StateStore store2 = EasyMock.createNiceMock(StateStore.class); + final StateStore store3 = EasyMock.createNiceMock(StateStore.class); + final StateStore store4 = EasyMock.createNiceMock(StateStore.class); + final String storeName1 = "storeName1"; + final String storeName2 = "storeName2"; + final String storeName3 = "storeName3"; + final String storeName4 = "storeName4"; + + expect(store1.name()).andReturn(storeName1).anyTimes(); + EasyMock.replay(store1); + expect(store2.name()).andReturn(storeName2).anyTimes(); + EasyMock.replay(store2); + expect(store3.name()).andReturn(storeName3).anyTimes(); + EasyMock.replay(store3); + expect(store4.name()).andReturn(storeName4).anyTimes(); + EasyMock.replay(store4); + + final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime()); + final AbstractTask task = createTask( + consumer, + new HashMap<StateStore, String>() { + { + put(store1, storeTopicPartition1.topic()); + put(store2, storeTopicPartition2.topic()); + put(store3, storeTopicPartition3.topic()); + put(store4, storeTopicPartition4.topic()); + } + }, + stateDirectory); + + final String taskDir = stateDirectory.directoryForTask(task.id).getAbsolutePath(); + final File storeDirectory1 = new File(taskDir + + File.separator + "rocksdb" + + File.separator + storeName1); + final File storeDirectory2 = new File(taskDir + + File.separator + "rocksdb" + + File.separator + storeName2); + final File storeDirectory3 = new File(taskDir + + File.separator + storeName3); + final File storeDirectory4 = new File(taskDir + + File.separator + storeName4); + final File testFile1 = new File(storeDirectory1.getAbsolutePath() + File.separator + "testFile"); + final File testFile2 = new File(storeDirectory2.getAbsolutePath() + File.separator + "testFile"); + final File testFile3 = new File(storeDirectory3.getAbsolutePath() + File.separator + "testFile"); + final File testFile4 = new File(storeDirectory4.getAbsolutePath() + File.separator + "testFile"); + + storeDirectory1.mkdirs(); + storeDirectory2.mkdirs(); + storeDirectory3.mkdirs(); + storeDirectory4.mkdirs(); + + testFile1.createNewFile(); + assertTrue(testFile1.exists()); + testFile2.createNewFile(); + assertTrue(testFile2.exists()); + testFile3.createNewFile(); + assertTrue(testFile3.exists()); + testFile4.createNewFile(); + assertTrue(testFile4.exists()); + + task.processorContext = new MockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig); + + task.stateMgr.register(store1, new MockRestoreCallback()); + task.stateMgr.register(store2, new MockRestoreCallback()); + task.stateMgr.register(store3, new MockRestoreCallback()); + task.stateMgr.register(store4, new MockRestoreCallback()); + + // only reinitialize store1 and store3 -- store2 and store4 should be untouched + task.reinitializeStateStoresForPartitions(Utils.mkSet(storeTopicPartition1, storeTopicPartition3)); + + assertFalse(testFile1.exists()); + assertTrue(testFile2.exists()); + assertFalse(testFile3.exists()); + assertTrue(testFile4.exists()); + } + + private AbstractTask createTask(final Consumer consumer, + final Map<StateStore, String> stateStoresToChangelogTopics) { + return createTask(consumer, stateStoresToChangelogTopics, stateDirectory); + } + @SuppressWarnings("unchecked") - private AbstractTask createTask(final Consumer consumer, final List<StateStore> stateStores) { + private AbstractTask createTask(final Consumer consumer, + final Map<StateStore, String> stateStoresToChangelogTopics, + final StateDirectory stateDirectory) { final Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyhost:9092"); final StreamsConfig config = new StreamsConfig(properties); + + final Map<String, String> storeNamesToChangelogTopics = new HashMap<>(stateStoresToChangelogTopics.size()); + for (final Map.Entry<StateStore, String> e : stateStoresToChangelogTopics.entrySet()) { + storeNamesToChangelogTopics.put(e.getKey().name(), e.getValue()); + } + return new AbstractTask(id, - Collections.singletonList(new TopicPartition("t", 0)), - ProcessorTopology.withLocalStores(stateStores, Collections.<String, String>emptyMap()), - (Consumer<byte[], byte[]>) consumer, - new StoreChangelogReader((Consumer<byte[], byte[]>) consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")), + storeTopicPartitions, + ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics), + consumer, + new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")), false, stateDirectory, config) { + @Override public void resume() {}