KAFKA-6121: Restore and global consumer should not use auto.offset.reset

- set auto.offset.reste to "none" for restore and global consumer
- handle InvalidOffsetException for restore and global consumer
- add corresponding tests
- some minor cleanup

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Damian Guy <damian....@gmail.com, Bill Bejeck <b...@confluent.io>, 
GuozhangWang <wangg...@gmail.com>

Closes #4215 from mjsax/kafka-6121-restore-global-consumer-handle-reset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04395175
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04395175
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04395175

Branch: refs/heads/trunk
Commit: 043951753b6fb6c8bae6d25d7a6a97e74b614cac
Parents: 2bf2348
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Mon Dec 11 14:20:10 2017 +0000
Committer: Damian Guy <damian....@gmail.com>
Committed: Mon Dec 11 14:20:10 2017 +0000

----------------------------------------------------------------------
 .../kafka/clients/consumer/MockConsumer.java    |   8 +
 .../org/apache/kafka/streams/StreamsConfig.java |   1 +
 .../internals/AbstractProcessorContext.java     |   5 +
 .../internals/AbstractStateManager.java         | 114 ++++++++++
 .../processor/internals/AbstractTask.java       |   6 +-
 .../processor/internals/GlobalStateManager.java |   5 +-
 .../internals/GlobalStateManagerImpl.java       |  90 +++++---
 .../internals/GlobalStateUpdateTask.java        |   4 +-
 .../processor/internals/GlobalStreamThread.java |  38 ++--
 .../internals/InternalProcessorContext.java     |   7 +-
 .../internals/ProcessorStateManager.java        |  70 +++---
 .../processor/internals/StandbyTask.java        |   6 +-
 .../processor/internals/StateManager.java       |   6 +-
 .../internals/StoreChangelogReader.java         |  21 +-
 .../streams/processor/internals/StreamTask.java |   6 -
 .../processor/internals/StreamThread.java       |  33 ++-
 .../internals/InnerMeteredKeyValueStore.java    |   1 -
 .../internals/MeteredKeyValueBytesStore.java    |   2 -
 .../apache/kafka/streams/StreamsConfigTest.java |   6 -
 .../processor/internals/AbstractTaskTest.java   | 147 +++++++++++--
 .../internals/GlobalStateManagerImplTest.java   | 216 +++++++++++++------
 .../internals/GlobalStreamThreadTest.java       | 111 ++++++++--
 .../processor/internals/StateConsumerTest.java  |   2 +-
 .../processor/internals/StateManagerStub.java   |   8 +-
 .../internals/StoreChangelogReaderTest.java     |  27 +++
 .../processor/internals/StreamTaskTest.java     |  43 ++--
 .../processor/internals/StreamThreadTest.java   | 101 ++++++++-
 .../kafka/test/GlobalStateManagerStub.java      |  17 +-
 .../apache/kafka/test/NoOpReadOnlyStore.java    |  17 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  11 +-
 30 files changed, 862 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9b0c058..10aedbb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -108,6 +108,14 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         }
         ensureNotClosed();
         this.subscriptions.subscribeFromPattern(topicsToSubscribe);
+        final Set<TopicPartition> assignedPartitions = new HashSet<>();
+        for (final String topic : topicsToSubscribe) {
+            for (final PartitionInfo info : this.partitions.get(topic)) {
+                assignedPartitions.add(new TopicPartition(topic, 
info.partition()));
+            }
+
+        }
+        subscriptions.assignFromSubscribed(assignedPartitions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 49b8a3c..d78fc0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -763,6 +763,7 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
         // add client id with stream client id prefix
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-restore-consumer");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 
         return consumerProps;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index aa58226..87408c6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -197,4 +197,9 @@ public abstract class AbstractProcessorContext implements 
InternalProcessorConte
     public void initialized() {
         initialized = true;
     }
+
+    @Override
+    public void uninitialize() {
+        initialized = false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
new file mode 100644
index 0000000..777e46b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+abstract class AbstractStateManager implements StateManager {
+    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+
+    final File baseDir;
+    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
+
+    OffsetCheckpoint checkpoint;
+
+    final Map<String, StateStore> stores = new LinkedHashMap<>();
+    final Map<String, StateStore> globalStores = new LinkedHashMap<>();
+
+    AbstractStateManager(final File baseDir) {
+        this.baseDir = baseDir;
+        this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
+
+    }
+
+    public void reinitializeStateStoresForPartitions(final Logger log,
+                                                     final Map<String, 
StateStore> stateStores,
+                                                     final Map<String, String> 
storeToChangelogTopic,
+                                                     final 
Collection<TopicPartition> partitions,
+                                                     final 
InternalProcessorContext processorContext) {
+        final Map<String, String> changelogTopicToStore = 
inverseOneToOneMap(storeToChangelogTopic);
+        final Set<String> storeToBeReinitialized = new HashSet<>();
+
+        for (final TopicPartition topicPartition : partitions) {
+            checkpointableOffsets.remove(topicPartition);
+            
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
+        }
+        try {
+            checkpoint.write(checkpointableOffsets);
+        } catch (final IOException fatalException) {
+            log.error("Failed to update checkpoint file for global stores.", 
fatalException);
+            throw new StreamsException("Failed to reinitialize global store.", 
fatalException);
+        }
+
+        final Iterator<Map.Entry<String, StateStore>> it = 
stateStores.entrySet().iterator();
+        while (it.hasNext()) {
+            final StateStore stateStore = it.next().getValue();
+            final String storeName = stateStore.name();
+            if (storeToBeReinitialized.contains(storeName)) {
+                try {
+                    stateStore.close();
+                } catch (final RuntimeException ignoreAndSwallow) { /* ignore 
*/ }
+                processorContext.uninitialize();
+                it.remove();
+
+                // TODO remove this eventually
+                // -> (only after we are sure, we don't need it for backward 
compatibility reasons anymore; maybe 2.0 release?)
+                // this is an ugly "hack" that is required because 
RocksDBStore does not follow the pattern to put the
+                // store directory as <taskDir>/<storeName> but nests it with 
an intermediate <taskDir>/rocksdb/<storeName>
+                try {
+                    Utils.delete(new File(baseDir + File.separator + "rocksdb" 
+ File.separator + storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, 
fatalException);
+                    throw new StreamsException(String.format("Failed to 
reinitialize store %s.", storeName), fatalException);
+                }
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + 
storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, 
fatalException);
+                    throw new StreamsException(String.format("Failed to 
reinitialize store %s.", storeName), fatalException);
+                }
+
+                stateStore.init(processorContext, stateStore);
+            }
+        }
+    }
+
+    private Map<String, String> inverseOneToOneMap(final Map<String, String> 
origin) {
+        final Map<String, String> reversedMap = new HashMap<>();
+        for (final Map.Entry<String, String> entry : origin.entrySet()) {
+            reversedMap.put(entry.getValue(), entry.getKey());
+        }
+        return reversedMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d11af3a..2b8af6d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,7 +52,7 @@ public abstract class AbstractTask implements Task {
     final Logger log;
     final LogContext logContext;
     boolean taskInitialized;
-    private final StateDirectory stateDirectory;
+    final StateDirectory stateDirectory;
 
     InternalProcessorContext processorContext;
 
@@ -220,10 +220,14 @@ public abstract class AbstractTask implements Task {
 
         for (final StateStore store : topology.stateStores()) {
             log.trace("Initializing store {}", store.name());
+            processorContext.uninitialize();
             store.init(processorContext, store);
         }
     }
 
+    void reinitializeStateStoresForPartitions(final Collection<TopicPartition> 
partitions) {
+        stateMgr.reinitializeStateStoresForPartitions(partitions, 
processorContext);
+    }
 
     /**
      * @throws ProcessorStateException if there is an error while closing the 
state manager

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
index c9b8ca8..479fd1f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
@@ -21,9 +21,12 @@ import org.apache.kafka.streams.errors.StreamsException;
 import java.util.Set;
 
 public interface GlobalStateManager extends StateManager {
+
+    void setGlobalProcessorContext(final InternalProcessorContext 
processorContext);
+
     /**
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    Set<String> initialize(InternalProcessorContext processorContext);
+    Set<String> initialize();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index bbae9aa..2d4ee8f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -33,38 +34,30 @@ import 
org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.CHECKPOINT_FILE_NAME;
-
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
-public class GlobalStateManagerImpl implements GlobalStateManager {
+public class GlobalStateManagerImpl extends AbstractStateManager implements 
GlobalStateManager {
     private final Logger log;
-
     private final ProcessorTopology topology;
     private final Consumer<byte[], byte[]> globalConsumer;
     private final StateDirectory stateDirectory;
-    private final Map<String, StateStore> stores = new LinkedHashMap<>();
-    private final File baseDir;
-    private final OffsetCheckpoint checkpoint;
     private final Set<String> globalStoreNames = new HashSet<>();
-    private final Map<TopicPartition, Long> checkpointableOffsets = new 
HashMap<>();
     private final StateRestoreListener stateRestoreListener;
+    private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
 
@@ -74,19 +67,24 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener 
stateRestoreListener,
                                   final StreamsConfig config) {
+        super(stateDirectory.globalStateDir());
+
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
         this.stateDirectory = stateDirectory;
-        this.baseDir = stateDirectory.globalStateDir();
-        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, 
CHECKPOINT_FILE_NAME));
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = 
config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
     }
 
     @Override
-    public Set<String> initialize(final InternalProcessorContext 
processorContext) {
+    public void setGlobalProcessorContext(final InternalProcessorContext 
processorContext) {
+        this.processorContext = processorContext;
+    }
+
+    @Override
+    public Set<String> initialize() {
         try {
             if (!stateDirectory.lockGlobalState()) {
                 throw new LockException(String.format("Failed to lock the 
global state directory: %s", baseDir));
@@ -103,7 +101,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             } catch (IOException e1) {
                 log.error("Failed to unlock the global state directory", e);
             }
-            throw new StreamsException("Failed to read checkpoints for global 
state stores", e);
+            throw new StreamsException("Failed to read checkpoints for global 
state globalStores", e);
         }
 
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -115,8 +113,22 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     }
 
     @Override
+    public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition> partitions,
+                                                     final 
InternalProcessorContext processorContext) {
+        super.reinitializeStateStoresForPartitions(
+            log,
+            globalStores,
+            topology.storeToChangelogTopic(),
+            partitions,
+            processorContext);
+
+        globalConsumer.assign(partitions);
+        globalConsumer.seekToBeginning(partitions);
+    }
+
+    @Override
     public StateStore getGlobalStore(final String name) {
-        return stores.get(name);
+        return globalStores.get(name);
     }
 
     @Override
@@ -131,7 +143,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback) {
 
-        if (stores.containsKey(store.name())) {
+        if (globalStores.containsKey(store.name())) {
             throw new IllegalArgumentException(String.format("Global Store %s 
has already been registered", store.name()));
         }
 
@@ -173,7 +185,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         }
         try {
             restoreState(stateRestoreCallback, topicPartitions, 
highWatermarks, store.name());
-            stores.put(store.name(), store);
+            globalStores.put(store.name(), store);
         } finally {
             globalConsumer.unsubscribe();
         }
@@ -249,17 +261,27 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
-                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(100);
-                final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
-                for (ConsumerRecord<byte[], byte[]> record : records) {
-                    if (record.key() != null) {
-                        restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
+                try {
+                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(100);
+                    final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
+                    for (ConsumerRecord<byte[], byte[]> record : records) {
+                        if (record.key() != null) {
+                            restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
+                        }
+                        offset = globalConsumer.position(topicPartition);
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    stateRestoreAdapter.restoreAll(restoreRecords);
+                    stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
+                    restoreCount += restoreRecords.size();
+                } catch (final InvalidOffsetException recoverableException) {
+                    log.warn("Restoring GlobalStore {} failed due to: {}. 
Deleting global store to recreate from scratch.",
+                        storeName,
+                        recoverableException.getMessage());
+                    
reinitializeStateStoresForPartitions(recoverableException.partitions(), 
processorContext);
+
+                    stateRestoreListener.onRestoreStart(topicPartition, 
storeName, offset, highWatermark);
+                    restoreCount = 0L;
                 }
-                stateRestoreAdapter.restoreAll(restoreRecords);
-                stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
-                restoreCount += restoreRecords.size();
             }
             stateRestoreListener.onRestoreEnd(topicPartition, storeName, 
restoreCount);
             checkpointableOffsets.put(topicPartition, offset);
@@ -268,8 +290,8 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
 
     @Override
     public void flush() {
-        log.debug("Flushing all global stores registered in the state 
manager");
-        for (StateStore store : this.stores.values()) {
+        log.debug("Flushing all global globalStores registered in the state 
manager");
+        for (StateStore store : this.globalStores.values()) {
             try {
                 log.trace("Flushing global store={}", store.name());
                 store.flush();
@@ -283,11 +305,11 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     @Override
     public void close(final Map<TopicPartition, Long> offsets) throws 
IOException {
         try {
-            if (stores.isEmpty()) {
+            if (globalStores.isEmpty()) {
                 return;
             }
             final StringBuilder closeFailed = new StringBuilder();
-            for (final Map.Entry<String, StateStore> entry : 
stores.entrySet()) {
+            for (final Map.Entry<String, StateStore> entry : 
globalStores.entrySet()) {
                 log.debug("Closing global storage engine {}", entry.getKey());
                 try {
                     entry.getValue().close();
@@ -300,9 +322,9 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                             .append("\n");
                 }
             }
-            stores.clear();
+            globalStores.clear();
             if (closeFailed.length() > 0) {
-                throw new ProcessorStateException("Exceptions caught during 
close of 1 or more global state stores\n" + closeFailed);
+                throw new ProcessorStateException("Exceptions caught during 
close of 1 or more global state globalStores\n" + closeFailed);
             }
             checkpoint(offsets);
         } finally {
@@ -317,7 +339,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {
-                log.warn("Failed to write offsets checkpoint for global 
stores", e);
+                log.warn("Failed to write offsets checkpoint for global 
globalStores", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 849af57..c18f3c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -56,8 +56,9 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
+    @Override
     public Map<TopicPartition, Long> initialize() {
-        final Set<String> storeNames = stateMgr.initialize(processorContext);
+        final Set<String> storeNames = stateMgr.initialize();
         final Map<String, String> storeNameToTopic = 
topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
@@ -69,7 +70,6 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
         return stateMgr.checkpointed();
     }
 
-
     @SuppressWarnings("unchecked")
     @Override
     public void update(final ConsumerRecord<byte[], byte[]> record) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 9d202d1..1cc5c85 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
@@ -234,14 +235,20 @@ public class GlobalStreamThread extends Thread {
         }
 
         void pollAndUpdate() {
-            final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollMs);
-            for (ConsumerRecord<byte[], byte[]> record : received) {
-                stateMaintainer.update(record);
-            }
-            final long now = time.milliseconds();
-            if (flushInterval >= 0 && now >= lastFlush + flushInterval) {
-                stateMaintainer.flushState();
-                lastFlush = now;
+            try {
+                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollMs);
+                for (ConsumerRecord<byte[], byte[]> record : received) {
+                    stateMaintainer.update(record);
+                }
+                final long now = time.milliseconds();
+                if (flushInterval >= 0 && now >= lastFlush + flushInterval) {
+                    stateMaintainer.flushState();
+                    lastFlush = now;
+                }
+            } catch (final InvalidOffsetException recoverableException) {
+                log.error("Updating global state failed. You can restart 
KafkaStreams to recover from this error.", recoverableException);
+                throw new StreamsException("Updating global state failed. " +
+                    "You can restart KafkaStreams to recover from this 
error.", recoverableException);
             }
         }
 
@@ -308,15 +315,19 @@ public class GlobalStreamThread extends Thread {
                                                                            
stateDirectory,
                                                                            
stateRestoreListener,
                                                                            
config);
+
+            final GlobalProcessorContextImpl globalProcessorContext = new 
GlobalProcessorContextImpl(
+                config,
+                stateMgr,
+                streamsMetrics,
+                cache);
+            stateMgr.setGlobalProcessorContext(globalProcessorContext);
+
             final StateConsumer stateConsumer
                     = new StateConsumer(this.logContext,
                                         globalConsumer,
                                         new GlobalStateUpdateTask(topology,
-                                                                  new 
GlobalProcessorContextImpl(
-                                                                          
config,
-                                                                          
stateMgr,
-                                                                          
streamsMetrics,
-                                                                          
cache),
+                                                                  
globalProcessorContext,
                                                                   stateMgr,
                                                                   
config.defaultDeserializationExceptionHandler(),
                                                                   logContext),
@@ -324,6 +335,7 @@ public class GlobalStreamThread extends Thread {
                                         
config.getLong(StreamsConfig.POLL_MS_CONFIG),
                                         
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
             stateConsumer.initialize();
+
             return stateConsumer;
         } catch (final LockException fatalException) {
             final String errorMsg = "Could not lock global state directory. 
This could happen if multiple KafkaStreams " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac..25df826 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -50,7 +50,12 @@ public interface InternalProcessorContext extends 
ProcessorContext {
     ThreadCache getCache();
 
     /**
-     * Mark this contex as being initialized
+     * Mark this context as being initialized
      */
     void initialized();
+
+    /**
+     * Mark this context as being uninitialized
+     */
+    void uninitialize();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3a2803e..1ee0e14 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -33,27 +33,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 
-public class ProcessorStateManager implements StateManager {
-
+public class ProcessorStateManager extends AbstractStateManager {
     private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
-    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
     private final Logger log;
-    private final File baseDir;
     private final TaskId taskId;
     private final String logPrefix;
     private final boolean isStandby;
     private final ChangelogReader changelogReader;
-    private final Map<String, StateStore> stores;
-    private final Map<String, StateStore> globalStores;
     private final Map<TopicPartition, Long> offsetLimits;
     private final Map<TopicPartition, Long> restoredOffsets;
-    private final Map<TopicPartition, Long> checkpointedOffsets;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used 
for standby tasks, keyed by state topic name
     private final Map<String, String> storeToChangelogTopic;
     private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@@ -61,7 +54,6 @@ public class ProcessorStateManager implements StateManager {
     // TODO: this map does not work with customized grouper where multiple 
partitions
     // of the same topic can be assigned to the same topic.
     private final Map<String, TopicPartition> partitionForTopic;
-    private OffsetCheckpoint checkpoint;
 
     /**
      * @throws ProcessorStateException if the task directory does not exist 
and could not be created
@@ -75,28 +67,25 @@ public class ProcessorStateManager implements StateManager {
                                  final ChangelogReader changelogReader,
                                  final boolean eosEnabled,
                                  final LogContext logContext) throws 
IOException {
+        super(stateDirectory.directoryForTask(taskId));
+
+        this.log = logContext.logger(ProcessorStateManager.class);
         this.taskId = taskId;
         this.changelogReader = changelogReader;
         logPrefix = String.format("task [%s] ", taskId);
-        this.log = logContext.logger(getClass());
 
         partitionForTopic = new HashMap<>();
         for (final TopicPartition source : sources) {
             partitionForTopic.put(source.topic(), source);
         }
-        stores = new LinkedHashMap<>();
-        globalStores = new HashMap<>();
         offsetLimits = new HashMap<>();
         restoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         restoreCallbacks = isStandby ? new HashMap<String, 
StateRestoreCallback>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
 
-        baseDir = stateDirectory.directoryForTask(taskId);
-
         // load the checkpoint information
-        checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-        checkpointedOffsets = new HashMap<>(checkpoint.read());
+        checkpointableOffsets.putAll(checkpoint.read());
 
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored 
offsets
@@ -120,42 +109,55 @@ public class ProcessorStateManager implements 
StateManager {
     @Override
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback) {
-        log.debug("Registering state store {} to its state manager", 
store.name());
+        final String storeName = store.name();
+        log.debug("Registering state store {} to its state manager", 
storeName);
 
-        if (store.name().equals(CHECKPOINT_FILE_NAME)) {
+        if (CHECKPOINT_FILE_NAME.equals(storeName)) {
             throw new IllegalArgumentException(String.format("%sIllegal store 
name: %s", logPrefix, CHECKPOINT_FILE_NAME));
         }
 
-        if (stores.containsKey(store.name())) {
-            throw new IllegalArgumentException(String.format("%sStore %s has 
already been registered.", logPrefix, store.name()));
+        if (stores.containsKey(storeName)) {
+            throw new IllegalArgumentException(String.format("%sStore %s has 
already been registered.", logPrefix, storeName));
         }
 
         // check that the underlying change log topic exist or not
-        final String topic = storeToChangelogTopic.get(store.name());
+        final String topic = storeToChangelogTopic.get(storeName);
         if (topic == null) {
-            stores.put(store.name(), store);
+            stores.put(storeName, store);
             return;
         }
 
         final TopicPartition storePartition = new TopicPartition(topic, 
getPartition(topic));
 
         if (isStandby) {
-            log.trace("Preparing standby replica of  state store {} with 
changelog topic {}", store.name(), topic);
+            log.trace("Preparing standby replica of persistent state store {} 
with changelog topic {}", storeName, topic);
             restoreCallbacks.put(topic, stateRestoreCallback);
+
         } else {
-            log.trace("Restoring state store {} from changelog topic {}", 
store.name(), topic);
+            log.trace("Restoring state store {} from changelog topic {}", 
storeName, topic);
             final StateRestorer restorer = new StateRestorer(storePartition,
                                                              new 
CompositeRestoreListener(stateRestoreCallback),
-                                                             
checkpointedOffsets.get(storePartition),
+                                                             
checkpointableOffsets.get(storePartition),
                                                              
offsetLimit(storePartition),
                                                              
store.persistent(),
-                                                             store.name());
+                storeName);
 
             changelogReader.register(restorer);
         }
         changelogPartitions.add(storePartition);
 
-        stores.put(store.name(), store);
+        stores.put(storeName, store);
+    }
+
+    @Override
+    public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition> partitions,
+                                                     final 
InternalProcessorContext processorContext) {
+        super.reinitializeStateStoresForPartitions(
+            log,
+            stores,
+            storeToChangelogTopic,
+            partitions,
+            processorContext);
     }
 
     @Override
@@ -167,8 +169,8 @@ public class ProcessorStateManager implements StateManager {
             final int partition = getPartition(topicName);
             final TopicPartition storePartition = new 
TopicPartition(topicName, partition);
 
-            if (checkpointedOffsets.containsKey(storePartition)) {
-                partitionsAndOffsets.put(storePartition, 
checkpointedOffsets.get(storePartition));
+            if (checkpointableOffsets.containsKey(storePartition)) {
+                partitionsAndOffsets.put(storePartition, 
checkpointableOffsets.get(storePartition));
             } else {
                 partitionsAndOffsets.put(storePartition, -1L);
             }
@@ -281,7 +283,7 @@ public class ProcessorStateManager implements StateManager {
             if (ackedOffsets != null) {
                 checkpoint(ackedOffsets);
             }
-
+            stores.clear();
         }
 
         if (firstException != null) {
@@ -293,7 +295,7 @@ public class ProcessorStateManager implements StateManager {
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
         log.trace("Writing checkpoint: {}", ackedOffsets);
-        checkpointedOffsets.putAll(changelogReader.restoredOffsets());
+        checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
             // only checkpoint the offset to the offsets file if
@@ -303,9 +305,9 @@ public class ProcessorStateManager implements StateManager {
                 final TopicPartition topicPartition = new 
TopicPartition(changelogTopic, getPartition(storeName));
                 if (ackedOffsets.containsKey(topicPartition)) {
                     // store the last offset + 1 (the log position after 
restoration)
-                    checkpointedOffsets.put(topicPartition, 
ackedOffsets.get(topicPartition) + 1);
+                    checkpointableOffsets.put(topicPartition, 
ackedOffsets.get(topicPartition) + 1);
                 } else if (restoredOffsets.containsKey(topicPartition)) {
-                    checkpointedOffsets.put(topicPartition, 
restoredOffsets.get(topicPartition));
+                    checkpointableOffsets.put(topicPartition, 
restoredOffsets.get(topicPartition));
                 }
             }
         }
@@ -314,7 +316,7 @@ public class ProcessorStateManager implements StateManager {
             if (checkpoint == null) {
                 checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
             }
-            checkpoint.write(checkpointedOffsets);
+            checkpoint.write(checkpointableOffsets);
         } catch (final IOException e) {
             log.warn("Failed to write checkpoint file to {}:", new 
File(baseDir, CHECKPOINT_FILE_NAME), e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 73fbf63..837f607 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -131,8 +131,10 @@ public class StandbyTask extends AbstractTask {
         log.debug("Closing");
         boolean committedSuccessfully = false;
         try {
-            commit();
-            committedSuccessfully = true;
+            if (clean) {
+                commit();
+                committedSuccessfully = true;
+            }
         } finally {
             closeStateManager(committedSuccessfully);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 2a8d9a3..f6efde6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 interface StateManager extends Checkpointable {
@@ -37,7 +38,10 @@ interface StateManager extends Checkpointable {
 
     void flush();
 
-    void close(Map<TopicPartition, Long> offsets) throws IOException;
+    void reinitializeStateStoresForPartitions(final Collection<TopicPartition> 
partitions,
+                                              final InternalProcessorContext 
processorContext);
+
+    void close(final Map<TopicPartition, Long> offsets) throws IOException;
 
     StateStore getGlobalStore(final String name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 83c783d..178d2bb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -78,12 +79,24 @@ public class StoreChangelogReader implements 
ChangelogReader {
             return completed();
         }
 
-        final Set<TopicPartition> partitions = new 
HashSet<>(needsRestoring.keySet());
-        final ConsumerRecords<byte[], byte[]> allRecords = 
restoreConsumer.poll(10);
-        for (final TopicPartition partition : partitions) {
-            restorePartition(allRecords, partition, 
active.restoringTaskFor(partition));
+        final Set<TopicPartition> restoringPartitions = new 
HashSet<>(needsRestoring.keySet());
+        try {
+            final ConsumerRecords<byte[], byte[]> allRecords = 
restoreConsumer.poll(10);
+            for (final TopicPartition partition : restoringPartitions) {
+                restorePartition(allRecords, partition, 
active.restoringTaskFor(partition));
+            }
+        } catch (final InvalidOffsetException recoverableException) {
+            log.warn("Restoring StreamTasks failed. Deleting StreamTasks 
stores to recreate from scratch.", recoverableException);
+            final Set<TopicPartition> partitions = 
recoverableException.partitions();
+            for (final TopicPartition partition : partitions) {
+                final StreamTask task = active.restoringTaskFor(partition);
+                log.info("Reinitializing StreamTask {}", task);
+                
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
+            }
+            restoreConsumer.seekToBeginning(partitions);
         }
 
+
         if (needsRestoring.isEmpty()) {
             restoreConsumer.unsubscribe();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 55456d0..f2fa448 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -33,7 +33,6 @@ import 
org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
@@ -641,11 +640,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     }
 
     // visible for testing only
-    ProcessorContext processorContext() {
-        return processorContext;
-    }
-
-    // visible for testing only
     RecordCollector recordCollector() {
         return recordCollector;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a9786f9..696081d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1036,22 +1036,33 @@ public class StreamThread extends Thread {
                 processStandbyRecords = false;
             }
 
-            final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(0);
+            try {
+                final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(0);
 
-            if (!records.isEmpty()) {
-                for (final TopicPartition partition : records.partitions()) {
-                    final StandbyTask task = 
taskManager.standbyTask(partition);
+                if (!records.isEmpty()) {
+                    for (final TopicPartition partition : 
records.partitions()) {
+                        final StandbyTask task = 
taskManager.standbyTask(partition);
 
-                    if (task == null) {
-                        throw new StreamsException(logPrefix + "Missing 
standby task for partition " + partition);
-                    }
+                        if (task == null) {
+                            throw new StreamsException(logPrefix + "Missing 
standby task for partition " + partition);
+                        }
 
-                    final List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(partition, records.records(partition));
-                    if (remaining != null) {
-                        restoreConsumer.pause(singleton(partition));
-                        standbyRecords.put(partition, remaining);
+                        final List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(partition, records.records(partition));
+                        if (remaining != null) {
+                            restoreConsumer.pause(singleton(partition));
+                            standbyRecords.put(partition, remaining);
+                        }
                     }
                 }
+            } catch (final InvalidOffsetException recoverableException) {
+                log.warn("Updating StandbyTasks failed. Deleting StandbyTasks 
stores to recreate from scratch.", recoverableException);
+                final Set<TopicPartition> partitions = 
recoverableException.partitions();
+                for (final TopicPartition partition : partitions) {
+                    final StandbyTask task = 
taskManager.standbyTask(partition);
+                    log.info("Reinitializing StandbyTask {}", task);
+                    
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
+                }
+                restoreConsumer.seekToBeginning(partitions);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 5ff8a26..a34851a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -159,7 +159,6 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends 
WrappedStateStore.Abstract
         } else {
             inner.init(InnerMeteredKeyValueStore.this.context, 
InnerMeteredKeyValueStore.this.root);
         }
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
index a6ff8d5..35647b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -102,8 +102,6 @@ public class MeteredKeyValueBytesStore<K, V> extends 
WrappedStateStore.AbstractS
                                         keySerde == null ? (Serde<K>) 
context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) 
context.valueSerde() : valueSerde);
         innerMetered.init(context, root);
-
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a06f7e8..12db711 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -190,11 +190,9 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldSupportPrefixedRestoreConsumerConfigs() {
-        props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
"earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 
1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
-        assertEquals("earliest", 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, 
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
@@ -245,11 +243,9 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("groupId");
-        assertEquals("earliest", 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, 
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
@@ -305,11 +301,9 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() 
{
-        
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest");
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
-        assertEquals("latest", 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", 
consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index e75b54f..776110c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -25,52 +25,69 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class AbstractTaskTest {
 
     private final TaskId id = new TaskId(0, 0);
     private StateDirectory stateDirectory  = 
EasyMock.createMock(StateDirectory.class);
+    private final TopicPartition storeTopicPartition1 = new 
TopicPartition("t1", 0);
+    private final TopicPartition storeTopicPartition2 = new 
TopicPartition("t2", 0);
+    private final TopicPartition storeTopicPartition3 = new 
TopicPartition("t3", 0);
+    private final TopicPartition storeTopicPartition4 = new 
TopicPartition("t4", 0);
+    private final Collection<TopicPartition> storeTopicPartitions
+        = Utils.mkSet(storeTopicPartition1, storeTopicPartition2, 
storeTopicPartition3, storeTopicPartition4);
 
     @Before
     public void before() {
-        
EasyMock.expect(stateDirectory.directoryForTask(id)).andReturn(TestUtils.tempDirectory());
+        
expect(stateDirectory.directoryForTask(id)).andReturn(TestUtils.tempDirectory());
     }
 
     @Test(expected = ProcessorStateException.class)
     public void 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException()
 {
         final Consumer consumer = mockConsumer(new 
AuthorizationException("blah"));
-        final AbstractTask task = createTask(consumer, 
Collections.<StateStore>emptyList());
+        final AbstractTask task = createTask(consumer, 
Collections.<StateStore, String>emptyMap());
         task.updateOffsetLimits();
     }
 
     @Test(expected = ProcessorStateException.class)
     public void 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() {
         final Consumer consumer = mockConsumer(new KafkaException("blah"));
-        final AbstractTask task = createTask(consumer, 
Collections.<StateStore>emptyList());
+        final AbstractTask task = createTask(consumer, 
Collections.<StateStore, String>emptyMap());
         task.updateOffsetLimits();
     }
 
     @Test(expected = WakeupException.class)
     public void 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
         final Consumer consumer = mockConsumer(new WakeupException());
-        final AbstractTask task = createTask(consumer, 
Collections.<StateStore>emptyList());
+        final AbstractTask task = createTask(consumer, 
Collections.<StateStore, String>emptyMap());
         task.updateOffsetLimits();
     }
 
@@ -78,10 +95,10 @@ public class AbstractTaskTest {
     public void 
shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() 
throws IOException {
         final Consumer consumer = EasyMock.createNiceMock(Consumer.class);
         final StateStore store = EasyMock.createNiceMock(StateStore.class);
-        EasyMock.expect(stateDirectory.lock(id)).andReturn(false);
+        expect(stateDirectory.lock(id)).andReturn(false);
         EasyMock.replay(stateDirectory);
 
-        final AbstractTask task = createTask(consumer, 
Collections.singletonList(store));
+        final AbstractTask task = createTask(consumer, 
Collections.singletonMap(store, "dummy"));
 
         try {
             task.initializeStateStores();
@@ -93,11 +110,11 @@ public class AbstractTaskTest {
     }
 
     @Test
-    public void shouldNotAttemptToLockIfNoStores() throws IOException {
+    public void shouldNotAttemptToLockIfNoStores() {
         final Consumer consumer = EasyMock.createNiceMock(Consumer.class);
         EasyMock.replay(stateDirectory);
 
-        final AbstractTask task = createTask(consumer, 
Collections.<StateStore>emptyList());
+        final AbstractTask task = createTask(consumer, 
Collections.<StateStore, String>emptyMap());
 
         task.initializeStateStores();
 
@@ -105,20 +122,122 @@ public class AbstractTaskTest {
         EasyMock.verify(stateDirectory);
     }
 
+    @Test
+    public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws 
IOException {
+        final StreamsConfig streamsConfig = new StreamsConfig(new Properties() 
{
+            {
+                put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
+                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
+            }
+        });
+        final Consumer consumer = EasyMock.createNiceMock(Consumer.class);
+
+        final StateStore store1 = EasyMock.createNiceMock(StateStore.class);
+        final StateStore store2 = EasyMock.createNiceMock(StateStore.class);
+        final StateStore store3 = EasyMock.createNiceMock(StateStore.class);
+        final StateStore store4 = EasyMock.createNiceMock(StateStore.class);
+        final String storeName1 = "storeName1";
+        final String storeName2 = "storeName2";
+        final String storeName3 = "storeName3";
+        final String storeName4 = "storeName4";
+
+        expect(store1.name()).andReturn(storeName1).anyTimes();
+        EasyMock.replay(store1);
+        expect(store2.name()).andReturn(storeName2).anyTimes();
+        EasyMock.replay(store2);
+        expect(store3.name()).andReturn(storeName3).anyTimes();
+        EasyMock.replay(store3);
+        expect(store4.name()).andReturn(storeName4).anyTimes();
+        EasyMock.replay(store4);
+
+        final StateDirectory stateDirectory = new 
StateDirectory(streamsConfig, new MockTime());
+        final AbstractTask task = createTask(
+            consumer,
+            new HashMap<StateStore, String>() {
+                {
+                    put(store1, storeTopicPartition1.topic());
+                    put(store2, storeTopicPartition2.topic());
+                    put(store3, storeTopicPartition3.topic());
+                    put(store4, storeTopicPartition4.topic());
+                }
+            },
+            stateDirectory);
+
+        final String taskDir = 
stateDirectory.directoryForTask(task.id).getAbsolutePath();
+        final File storeDirectory1 = new File(taskDir
+            + File.separator + "rocksdb"
+            + File.separator + storeName1);
+        final File storeDirectory2 = new File(taskDir
+            + File.separator + "rocksdb"
+            + File.separator + storeName2);
+        final File storeDirectory3 = new File(taskDir
+            + File.separator + storeName3);
+        final File storeDirectory4 = new File(taskDir
+            + File.separator + storeName4);
+        final File testFile1 = new File(storeDirectory1.getAbsolutePath() + 
File.separator + "testFile");
+        final File testFile2 = new File(storeDirectory2.getAbsolutePath() + 
File.separator + "testFile");
+        final File testFile3 = new File(storeDirectory3.getAbsolutePath() + 
File.separator + "testFile");
+        final File testFile4 = new File(storeDirectory4.getAbsolutePath() + 
File.separator + "testFile");
+
+        storeDirectory1.mkdirs();
+        storeDirectory2.mkdirs();
+        storeDirectory3.mkdirs();
+        storeDirectory4.mkdirs();
+
+        testFile1.createNewFile();
+        assertTrue(testFile1.exists());
+        testFile2.createNewFile();
+        assertTrue(testFile2.exists());
+        testFile3.createNewFile();
+        assertTrue(testFile3.exists());
+        testFile4.createNewFile();
+        assertTrue(testFile4.exists());
+
+        task.processorContext = new 
MockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
+
+        task.stateMgr.register(store1, new MockRestoreCallback());
+        task.stateMgr.register(store2, new MockRestoreCallback());
+        task.stateMgr.register(store3, new MockRestoreCallback());
+        task.stateMgr.register(store4, new MockRestoreCallback());
+
+        // only reinitialize store1 and store3 -- store2 and store4 should be 
untouched
+        
task.reinitializeStateStoresForPartitions(Utils.mkSet(storeTopicPartition1, 
storeTopicPartition3));
+
+        assertFalse(testFile1.exists());
+        assertTrue(testFile2.exists());
+        assertFalse(testFile3.exists());
+        assertTrue(testFile4.exists());
+    }
+
+    private AbstractTask createTask(final Consumer consumer,
+                                    final Map<StateStore, String> 
stateStoresToChangelogTopics) {
+        return createTask(consumer, stateStoresToChangelogTopics, 
stateDirectory);
+    }
+
     @SuppressWarnings("unchecked")
-    private AbstractTask createTask(final Consumer consumer, final 
List<StateStore> stateStores) {
+    private AbstractTask createTask(final Consumer consumer,
+                                    final Map<StateStore, String> 
stateStoresToChangelogTopics,
+                                    final StateDirectory stateDirectory) {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"dummyhost:9092");
         final StreamsConfig config = new StreamsConfig(properties);
+
+        final Map<String, String> storeNamesToChangelogTopics = new 
HashMap<>(stateStoresToChangelogTopics.size());
+        for (final Map.Entry<StateStore, String> e : 
stateStoresToChangelogTopics.entrySet()) {
+            storeNamesToChangelogTopics.put(e.getKey().name(), e.getValue());
+        }
+
         return new AbstractTask(id,
-                                Collections.singletonList(new 
TopicPartition("t", 0)),
-                                ProcessorTopology.withLocalStores(stateStores, 
Collections.<String, String>emptyMap()),
-                                (Consumer<byte[], byte[]>) consumer,
-                                new StoreChangelogReader((Consumer<byte[], 
byte[]>) consumer, new MockStateRestoreListener(), new 
LogContext("stream-task-test ")),
+                                storeTopicPartitions,
+                                ProcessorTopology.withLocalStores(new 
ArrayList<>(stateStoresToChangelogTopics.keySet()), 
storeNamesToChangelogTopics),
+                                consumer,
+                                new StoreChangelogReader(consumer, new 
MockStateRestoreListener(), new LogContext("stream-task-test ")),
                                 false,
                                 stateDirectory,
                                 config) {
+
             @Override
             public void resume() {}
 

Reply via email to