Repository: kafka
Updated Branches:
  refs/heads/trunk fc1cfe475 -> 1f1e794ad


KAFKA-4317: Regularly checkpoint StateStore changelog offsets

Currently the checkpoint file is deleted at state store initialization and it 
is only ever written again during a clean shutdown. This can result in 
significant delays during restarts as the entire store needs to be loaded from 
the changelog.
We can mitigate against this by frequently checkpointing the offsets. The 
checkpointing happens only during the commit phase, i.e, after we have manually 
flushed the store and the producer. So we guarantee that the checkpointed 
offsets are never greater than what has been flushed.
In the event of hard failure we can recover by reading the checkpoints and 
consuming from the stored offsets.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2471 from dguy/kafka-4317


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f1e794a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f1e794a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f1e794a

Branch: refs/heads/trunk
Commit: 1f1e794ad0025bb2a33b7d8378a481f224b3bccc
Parents: fc1cfe4
Author: Damian Guy <damian....@gmail.com>
Authored: Fri Feb 17 14:41:28 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Fri Feb 17 14:41:28 2017 -0800

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |  18 +-
 .../processor/internals/Checkpointable.java     |  27 +++
 .../internals/GlobalStateManagerImpl.java       |  16 +-
 .../internals/GlobalStateUpdateTask.java        |   3 +-
 .../internals/ProcessorStateManager.java        |  57 +++---
 .../processor/internals/StandbyTask.java        |  23 +--
 .../processor/internals/StateManager.java       |   4 +-
 .../streams/processor/internals/StreamTask.java |   5 +-
 .../state/internals/InMemoryKeyValueStore.java  |   2 +-
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../internals/GlobalStateManagerImplTest.java   |  49 ++++-
 .../internals/GlobalStateTaskTest.java          |  16 +-
 .../internals/ProcessorStateManagerTest.java    | 186 +++++++++++++------
 .../processor/internals/StandbyTaskTest.java    |  51 ++++-
 .../processor/internals/StateManagerStub.java   |   7 +-
 .../processor/internals/StreamTaskTest.java     |  60 ++++++
 .../kafka/test/GlobalStateManagerStub.java      |   7 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 18 files changed, 401 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 2a040ba..8de5d23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,14 +52,14 @@ public abstract class AbstractTask {
     /**
      * @throws ProcessorStateException if the state manager cannot be created
      */
-    protected AbstractTask(TaskId id,
-                           String applicationId,
-                           Collection<TopicPartition> partitions,
-                           ProcessorTopology topology,
-                           Consumer<byte[], byte[]> consumer,
-                           Consumer<byte[], byte[]> restoreConsumer,
-                           boolean isStandby,
-                           StateDirectory stateDirectory,
+    protected AbstractTask(final TaskId id,
+                           final String applicationId,
+                           final Collection<TopicPartition> partitions,
+                           final ProcessorTopology topology,
+                           final Consumer<byte[], byte[]> consumer,
+                           final Consumer<byte[], byte[]> restoreConsumer,
+                           final boolean isStandby,
+                           final StateDirectory stateDirectory,
                            final ThreadCache cache) {
         this.id = id;
         this.applicationId = applicationId;
@@ -70,7 +70,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(id, partitions, 
restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
+            stateMgr = new ProcessorStateManager(id, partitions, 
restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error 
while creating the state manager", id), e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
new file mode 100644
index 0000000..7b02d5b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+// Interface to indicate that an object has associated partition offsets that 
can be checkpointed
+interface Checkpointable {
+    void checkpoint(final Map<TopicPartition, Long> offsets);
+    Map<TopicPartition, Long> checkpointed();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 7534993..3819bb5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     private final File baseDir;
     private final OffsetCheckpoint checkpoint;
     private final Set<String> globalStoreNames = new HashSet<>();
-    private HashMap<TopicPartition, Long> checkpointableOffsets;
+    private final Map<TopicPartition, Long> checkpointableOffsets = new 
HashMap<>();
 
     public GlobalStateManagerImpl(final ProcessorTopology topology,
-                           final Consumer<byte[], byte[]> consumer,
-                           final StateDirectory stateDirectory) {
+                                  final Consumer<byte[], byte[]> consumer,
+                                  final StateDirectory stateDirectory) {
         this.topology = topology;
         this.consumer = consumer;
         this.stateDirectory = stateDirectory;
@@ -81,8 +81,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         }
 
         try {
-            this.checkpointableOffsets = new HashMap<>(checkpoint.read());
-            checkpoint.delete();
+            this.checkpointableOffsets.putAll(checkpoint.read());
         } catch (IOException e) {
             try {
                 stateDirectory.unlockGlobalState();
@@ -220,13 +219,14 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             if (closeFailed.length() > 0) {
                 throw new ProcessorStateException("Exceptions caught during 
close of 1 or more global state stores\n" + closeFailed);
             }
-            writeCheckpoints(offsets);
+            checkpoint(offsets);
         } finally {
             stateDirectory.unlockGlobalState();
         }
     }
 
-    private void writeCheckpoints(final Map<TopicPartition, Long> offsets) {
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
         if (!offsets.isEmpty()) {
             checkpointableOffsets.putAll(offsets);
             try {
@@ -238,7 +238,7 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return Collections.unmodifiableMap(checkpointableOffsets);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 40f2a3c..6da37e4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -67,7 +67,7 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
         }
         initTopology();
         processorContext.initialized();
-        return stateMgr.checkpointedOffsets();
+        return stateMgr.checkpointed();
     }
 
 
@@ -89,6 +89,7 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
 
     public void flushState() {
         stateMgr.flush(processorContext);
+        stateMgr.checkpoint(offsets);
     }
 
     public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1c786e3..0e8caa2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager {
     // TODO: this map does not work with customized grouper where multiple 
partitions
     // of the same topic can be assigned to the same topic.
     private final Map<String, TopicPartition> partitionForTopic;
+    private final OffsetCheckpoint checkpoint;
 
     /**
      * @throws LockException if the state directory cannot be locked because 
another thread holds the lock
@@ -103,11 +104,8 @@ public class ProcessorStateManager implements StateManager 
{
         }
 
         // load the checkpoint information
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(this.baseDir, CHECKPOINT_FILE_NAME));
+        checkpoint = new OffsetCheckpoint(new File(this.baseDir, 
CHECKPOINT_FILE_NAME));
         this.checkpointedOffsets = new HashMap<>(checkpoint.read());
-
-        // delete the checkpoint file after finish loading its stored offsets
-        checkpoint.delete();
     }
 
 
@@ -250,7 +248,7 @@ public class ProcessorStateManager implements StateManager {
         }
     }
 
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
 
         for (Map.Entry<String, StateRestoreCallback> entry : 
restoreCallbacks.entrySet()) {
@@ -347,29 +345,7 @@ public class ProcessorStateManager implements StateManager 
{
                 }
 
                 if (ackedOffsets != null) {
-                    Map<TopicPartition, Long> checkpointOffsets = new 
HashMap<>();
-                    for (String storeName : stores.keySet()) {
-                        // only checkpoint the offset to the offsets file if
-                        // it is persistent AND changelog enabled
-                        if (stores.get(storeName).persistent() && 
storeToChangelogTopic.containsKey(storeName)) {
-                            String changelogTopic = 
storeToChangelogTopic.get(storeName);
-                            TopicPartition topicPartition = new 
TopicPartition(changelogTopic, getPartition(storeName));
-                            Long offset = ackedOffsets.get(topicPartition);
-
-                            if (offset != null) {
-                                // store the last offset + 1 (the log position 
after restoration)
-                                checkpointOffsets.put(topicPartition, offset + 
1);
-                            } else {
-                                // if no record was produced. we need to check 
the restored offset.
-                                offset = restoredOffsets.get(topicPartition);
-                                if (offset != null)
-                                    checkpointOffsets.put(topicPartition, 
offset);
-                            }
-                        }
-                    }
-                    // write the checkpoint file before closing, to indicate 
clean shutdown
-                    OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(this.baseDir, CHECKPOINT_FILE_NAME));
-                    checkpoint.write(checkpointOffsets);
+                    checkpoint(ackedOffsets);
                 }
 
             }
@@ -379,6 +355,31 @@ public class ProcessorStateManager implements StateManager 
{
         }
     }
 
+    // write the checkpoint
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
+        for (String storeName : stores.keySet()) {
+            // only checkpoint the offset to the offsets file if
+            // it is persistent AND changelog enabled
+            if (stores.get(storeName).persistent() && 
storeToChangelogTopic.containsKey(storeName)) {
+                final String changelogTopic = 
storeToChangelogTopic.get(storeName);
+                final TopicPartition topicPartition = new 
TopicPartition(changelogTopic, getPartition(storeName));
+                if (ackedOffsets.containsKey(topicPartition)) {
+                    // store the last offset + 1 (the log position after 
restoration)
+                    checkpointedOffsets.put(topicPartition, 
ackedOffsets.get(topicPartition) + 1);
+                } else if (restoredOffsets.containsKey(topicPartition)) {
+                    checkpointedOffsets.put(topicPartition, 
restoredOffsets.get(topicPartition));
+                }
+            }
+        }
+        // write the checkpoint file before closing, to indicate clean shutdown
+        try {
+            checkpoint.write(checkpointedOffsets);
+        } catch (IOException e) {
+            log.warn("Failed to write checkpoint file to {}", new 
File(baseDir, CHECKPOINT_FILE_NAME), e);
+        }
+    }
+
     private int getPartition(String topic) {
         TopicPartition partition = partitionForTopic.get(topic);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4437a19..a27098c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -51,14 +51,15 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamsMetrics} created by the 
thread
      * @param stateDirectory        the {@link StateDirectory} created by the 
thread
      */
-    public StandbyTask(TaskId id,
-                       String applicationId,
-                       Collection<TopicPartition> partitions,
-                       ProcessorTopology topology,
-                       Consumer<byte[], byte[]> consumer,
-                       Consumer<byte[], byte[]> restoreConsumer,
-                       StreamsConfig config,
-                       StreamsMetrics metrics, final StateDirectory 
stateDirectory) {
+    public StandbyTask(final TaskId id,
+                       final String applicationId,
+                       final Collection<TopicPartition> partitions,
+                       final ProcessorTopology topology,
+                       final Consumer<byte[], byte[]> consumer,
+                       final Consumer<byte[], byte[]> restoreConsumer,
+                       final StreamsConfig config,
+                       final StreamsMetrics metrics,
+                       final StateDirectory stateDirectory) {
         super(id, applicationId, partitions, topology, consumer, 
restoreConsumer, true, stateDirectory, null);
 
         // initialize the topology with its own context
@@ -67,9 +68,9 @@ public class StandbyTask extends AbstractTask {
         log.info("standby-task [{}] Initializing state stores", id());
         initializeStateStores();
 
-        ((StandbyContextImpl) this.processorContext).initialized();
+        this.processorContext.initialized();
 
-        this.checkpointedOffsets = 
Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+        this.checkpointedOffsets = 
Collections.unmodifiableMap(stateMgr.checkpointed());
     }
 
     public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -92,7 +93,7 @@ public class StandbyTask extends AbstractTask {
     public void commit() {
         log.debug("standby-task [{}] Committing its state", id());
         stateMgr.flush(processorContext);
-
+        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
         // reinitialize offset limits
         initializeOffsetLimits();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 7343c85..3102b77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
-interface StateManager {
+interface StateManager extends Checkpointable {
     File baseDir();
 
     void register(final StateStore store, final boolean loggingEnabled, final 
StateRestoreCallback stateRestoreCallback);
@@ -36,6 +36,4 @@ interface StateManager {
     StateStore getGlobalStore(final String name);
 
     StateStore getStore(final String name);
-
-    Map<TopicPartition, Long> checkpointedOffsets();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index be77856..d95ac4b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -76,8 +76,9 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
             log.trace("{} Start flushing its producer's sent records upon 
committing its state", logPrefix);
             // 2) flush produced records in the downstream and change logs of 
local states
             recordCollector.flush();
-
-            // 3) commit consumed offsets if it is dirty already
+            // 3) write checkpoints for any local state
+            stateMgr.checkpoint(recordCollectorOffsets());
+            // 4) commit consumed offsets if it is dirty already
             commitOffsets();
         }
     };

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index fe50152..28dea79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -39,7 +39,7 @@ public class InMemoryKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
 
     private StateSerdes<K, V> serdes;
 
-    InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final 
Serde<V> valueSerde) {
+    public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, 
final Serde<V> valueSerde) {
         this.name = name;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index fc0953b..f288f98 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -60,6 +60,7 @@ public class AbstractTaskTest {
     }
 
     private AbstractTask createTask(final Consumer consumer) {
+        final MockTime time = new MockTime();
         return new AbstractTask(new TaskId(0, 0),
                                 "app",
                                 Collections.singletonList(new 
TopicPartition("t", 0)),
@@ -72,7 +73,7 @@ public class AbstractTaskTest {
                                 consumer,
                                 consumer,
                                 false,
-                                new StateDirectory("app", 
TestUtils.tempDirectory().getPath(), new MockTime()),
+                                new StateDirectory("app", 
TestUtils.tempDirectory().getPath(), time),
                                 new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics()))) {
             @Override
             public void commit() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 168b300..062079f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -50,6 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -125,15 +127,15 @@ public class GlobalStateManagerImplTest {
         final Map<TopicPartition, Long> expected = writeCheckpoint();
 
         stateManager.initialize(context);
-        final Map<TopicPartition, Long> offsets = 
stateManager.checkpointedOffsets();
+        final Map<TopicPartition, Long> offsets = stateManager.checkpointed();
         assertEquals(expected, offsets);
     }
 
     @Test
-    public void shouldDeleteCheckpointFileAfteLoaded() throws Exception {
+    public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception {
         writeCheckpoint();
         stateManager.initialize(context);
-        assertFalse(checkpointFile.exists());
+        assertTrue(checkpointFile.exists());
     }
 
     @Test(expected = StreamsException.class)
@@ -168,7 +170,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void 
shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws 
Exception {
+    public void 
shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws 
Exception {
         stateManager.initialize(context);
         initializeConsumer(2, 1, t1);
         stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -271,9 +273,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store1, false, stateRestoreCallback);
         final Map<TopicPartition, Long> expected = 
Collections.singletonMap(t1, 25L);
         stateManager.close(expected);
-        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
-                                                                               
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        final Map<TopicPartition, Long> result = offsetCheckpoint.read();
+        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
         assertEquals(expected, result);
     }
 
@@ -377,6 +377,41 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
+    public void shouldCheckpointOffsets() throws Exception {
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 
25L);
+        stateManager.initialize(context);
+
+        stateManager.checkpoint(offsets);
+
+        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
+        assertThat(result, equalTo(offsets));
+        assertThat(stateManager.checkpointed(), equalTo(offsets));
+    }
+
+    @Test
+    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() 
throws Exception {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        initializeConsumer(10, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        initializeConsumer(20, 1, t2);
+        stateManager.register(store2, false, stateRestoreCallback);
+
+        final Map<TopicPartition, Long> initialCheckpoint = 
stateManager.checkpointed();
+        stateManager.checkpoint(Collections.singletonMap(t1, 101L));
+
+        final Map<TopicPartition, Long> updatedCheckpoint = 
stateManager.checkpointed();
+        assertThat(updatedCheckpoint.get(t2), 
equalTo(initialCheckpoint.get(t2)));
+        assertThat(updatedCheckpoint.get(t1), equalTo(101L));
+    }
+
+    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
+                                                                               
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        return offsetCheckpoint.read();
+    }
+
+    @Test
     public void 
shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws 
Exception {
         stateManager = new GlobalStateManagerImpl(topology, consumer, new 
StateDirectory("appId", stateDirPath, time) {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index df0b73c..66999bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -38,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -137,7 +139,19 @@ public class GlobalStateTaskTest {
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, 
"foo".getBytes(), "foo".getBytes()));
         globalStateTask.close();
-        assertEquals(expectedOffsets, stateMgr.checkpointedOffsets());
+        assertEquals(expectedOffsets, stateMgr.checkpointed());
         assertTrue(stateMgr.closed);
     }
+
+    @Test
+    public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception {
+        final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(t1, 102L);
+        expectedOffsets.put(t2, 100L);
+        globalStateTask.initialize();
+        globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, 
"foo".getBytes(), "foo".getBytes()));
+        globalStateTask.flushState();
+        assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index b8d51ba..f1d3090 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -58,6 +59,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -71,7 +74,6 @@ public class ProcessorStateManagerTest {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
         private TopicPartition assignedPartition = null;
-        private TopicPartition seekPartition = null;
         private long seekOffset = -1L;
         private boolean seekToBeginingCalled = false;
         private boolean seekToEndCalled = false;
@@ -162,7 +164,6 @@ public class ProcessorStateManagerTest {
             if (seekOffset >= 0)
                 throw new IllegalStateException("RestoreConsumer: offset 
already seeked");
 
-            seekPartition = partition;
             seekOffset = offset;
             currentOffset = offset;
             super.seek(partition, offset);
@@ -203,6 +204,9 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = 
ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = 
ProcessorStateManager.storeChangelogTopic(applicationId, 
nonPersistentStoreName);
+    private final MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+    private final MockStateStoreSupplier.MockStateStore nonPersistentStore = 
new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+    private final TopicPartition persistentStorePartition = new 
TopicPartition(persistentStoreTopicName, 1);
     private final String storeName = "mockStateStore";
     private final String changelogTopic = 
ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
     private final TopicPartition changelogTopicPartition = new 
TopicPartition(changelogTopic, 0);
@@ -210,6 +214,8 @@ public class ProcessorStateManagerTest {
     private final MockRestoreConsumer restoreConsumer = new 
MockRestoreConsumer();
     private final MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(storeName, true);
     private File baseDir;
+    private File checkpointFile;
+    private OffsetCheckpoint checkpoint;
     private StateDirectory stateDirectory;
 
 
@@ -217,6 +223,14 @@ public class ProcessorStateManagerTest {
     public void setup() {
         baseDir = TestUtils.tempDirectory();
         stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), 
new MockTime());
+        checkpointFile = new File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        checkpoint = new OffsetCheckpoint(checkpointFile);
+        restoreConsumer.updatePartitions(persistentStoreTopicName, 
Utils.mkList(
+                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), 
new Node[0], new Node[0])
+        ));
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, 
Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, 
Node.noNode(), new Node[0], new Node[0])
+        ));
     }
 
     @After
@@ -300,8 +314,6 @@ public class ProcessorStateManagerTest {
     public void testRegisterNonPersistentStore() throws IOException {
         long lastCheckpointedOffset = 10L;
 
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, 
ProcessorStateManager.CHECKPOINT_FILE_NAME));
         checkpoint.write(Collections.singletonMap(new 
TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
@@ -313,8 +325,6 @@ public class ProcessorStateManagerTest {
         TopicPartition partition = new 
TopicPartition(persistentStoreTopicName, 2);
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 
13L));
 
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non 
persistent store
-
         ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new 
HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
@@ -325,7 +335,7 @@ public class ProcessorStateManagerTest {
             restoreConsumer.reset();
 
             ArrayList<Integer> expectedKeys = new ArrayList<>();
-            long offset = -1L;
+            long offset;
             for (int i = 1; i <= 3; i++) {
                 offset = (long) (i + 100);
                 int key = i;
@@ -346,12 +356,13 @@ public class ProcessorStateManagerTest {
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
-
     }
 
     @Test
     public void testChangeLogOffsets() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
+        final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(
+                new File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME));
         long lastCheckpointedOffset = 10L;
         String storeName1 = "store1";
         String storeName2 = "store2";
@@ -366,10 +377,7 @@ public class ProcessorStateManagerTest {
         storeToChangelogTopic.put(storeName2, storeTopicName2);
         storeToChangelogTopic.put(storeName3, storeTopicName3);
 
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        checkpoint.write(Collections.singletonMap(new 
TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
-
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        offsetCheckpoint.write(Collections.singletonMap(new 
TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
         restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
                 new PartitionInfo(storeTopicName1, 0, Node.noNode(), new 
Node[0], new Node[0])
@@ -406,7 +414,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(store2, true, store2.stateRestoreCallback);
             stateMgr.register(store3, true, store3.stateRestoreCallback);
 
-            Map<TopicPartition, Long> changeLogOffsets = 
stateMgr.checkpointedOffsets();
+            Map<TopicPartition, Long> changeLogOffsets = 
stateMgr.checkpointed();
 
             assertEquals(3, changeLogOffsets.size());
             assertTrue(changeLogOffsets.containsKey(partition1));
@@ -424,20 +432,12 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testGetStore() throws IOException {
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, 
Utils.mkList(
-                new PartitionInfo(nonPersistentStoreTopicName, 1, 
Node.noNode(), new Node[0], new Node[0])
-        ));
-
-        MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
-        ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, 
Collections.<String, String>emptyMap());
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, 
Collections.<String, String>emptyMap());
         try {
-            stateMgr.register(mockStateStore, true, 
mockStateStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, true, 
nonPersistentStore.stateRestoreCallback);
 
             assertNull(stateMgr.getStore("noSuchStore"));
-            assertEquals(mockStateStore, 
stateMgr.getStore(nonPersistentStoreName));
+            assertEquals(nonPersistentStore, 
stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -446,30 +446,15 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testFlushAndClose() throws IOException {
-        final TaskId taskId = new TaskId(0, 1);
-        File checkpointFile = new 
File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
         // write an empty checkpoint file
-        OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
-
-        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-        restoreConsumer.updatePartitions(persistentStoreTopicName, 
Utils.mkList(
-                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), 
new Node[0], new Node[0])
-        ));
-        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, 
Utils.mkList(
-                new PartitionInfo(nonPersistentStoreTopicName, 1, 
Node.noNode(), new Node[0], new Node[0])
-        ));
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
         // set up ack'ed offsets
-        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+        final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
         ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 
123L);
         ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 
456L);
         ackedOffsets.put(new 
TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, 
"otherTopic"), 1), 789L);
 
-        MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
         ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, 
noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, 
String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
@@ -477,8 +462,8 @@ public class ProcessorStateManagerTest {
             }
         });
         try {
-            // make sure the checkpoint file is deleted
-            assertFalse(checkpointFile.exists());
+            // make sure the checkpoint file isn't deleted
+            assertTrue(checkpointFile.exists());
 
             restoreConsumer.reset();
             stateMgr.register(persistentStore, true, 
persistentStore.stateRestoreCallback);
@@ -499,39 +484,122 @@ public class ProcessorStateManagerTest {
         assertTrue(checkpointFile.exists());
 
         // the checkpoint file should contain an offset from the persistent 
store only.
-        OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
-        Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
+        final Map<TopicPartition, Long> checkpointedOffsets = 
checkpoint.read();
         assertEquals(1, checkpointedOffsets.size());
         assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new 
TopicPartition(persistentStoreTopicName, 1)));
     }
 
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() 
throws Exception {
-        MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
         ProcessorStateManager stateMgr = new ProcessorStateManager(new 
TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, 
Collections.<String, String>emptyMap());
-        stateMgr.register(mockStateStore, false, 
mockStateStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, false, 
nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
 
     @Test
-    public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws 
Exception {
-        final File checkpointFile = new 
File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        // write an empty checkpoint file
-        final OffsetCheckpoint oldCheckpoint = new 
OffsetCheckpoint(checkpointFile);
-        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
+    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception {
+        final Map<TopicPartition, Long> offsets = 
Collections.singletonMap(persistentStorePartition, 99L);
+        checkpoint.write(offsets);
 
-        restoreConsumer.updatePartitions(persistentStoreTopicName, 
Utils.mkList(
-                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), 
new Node[0], new Node[0])
-        ));
+        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId,
+                                                                         
noPartitions,
+                                                                         
restoreConsumer,
+                                                                         false,
+                                                                         
stateDirectory,
+                                                                         
Collections.<String, String>emptyMap());
+
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, 
persistentStore.stateRestoreCallback);
+        stateMgr.close(null);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(offsets));
+    }
+
+    @Test
+    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws 
Exception {
+        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId,
+                                                                         
noPartitions,
+                                                                         
restoreConsumer,
+                                                                         false,
+                                                                         
stateDirectory,
+                                                                         
Collections.singletonMap(persistentStore.name(),
+                                                                               
                   persistentStoreTopicName));
+        restoreConsumer.reset();
+        stateMgr.register(persistentStore, true, 
persistentStore.stateRestoreCallback);
 
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, 
stateDirectory, Collections.<String, String>emptyMap());
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 
10L));
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, 
equalTo(Collections.singletonMap(persistentStorePartition, 11L)));
+    }
+
+    @Test
+    public void shouldWriteCheckpointForStandbyReplica() throws Exception {
+        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId,
+                                                                         
noPartitions,
+                                                                         
restoreConsumer,
+                                                                         true,
+                                                                         
stateDirectory,
+                                                                         
Collections.singletonMap(persistentStore.name(),
+                                                                               
                   persistentStoreTopicName));
 
         restoreConsumer.reset();
         stateMgr.register(persistentStore, true, 
persistentStore.stateRestoreCallback);
-        stateMgr.close(null);
-        assertFalse(checkpointFile.exists());
+        final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
+        stateMgr.updateStandbyStates(persistentStorePartition,
+                                     Collections.singletonList(
+                                             new 
ConsumerRecord<>(persistentStorePartition.topic(),
+                                                                               
 persistentStorePartition.partition(),
+                                                                               
 888L,
+                                                                               
 bytes,
+                                                                               
 bytes)));
+
+        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, 
equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
+
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
+        final TopicPartition topicPartition = new 
TopicPartition(nonPersistentStoreTopicName, 1);
+
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, 
Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, 
Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId,
+                                                                         
noPartitions,
+                                                                         
restoreConsumer,
+                                                                         true,
+                                                                         
stateDirectory,
+                                                                         
Collections.singletonMap(nonPersistentStoreName,
+                                                                               
                   nonPersistentStoreTopicName));
+
+        restoreConsumer.reset();
+        stateMgr.register(nonPersistentStore, true, 
nonPersistentStore.stateRestoreCallback);
+        stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
+    }
+
+    @Test
+    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() 
throws Exception {
+        final ProcessorStateManager stateMgr = new 
ProcessorStateManager(taskId,
+                                                                         
noPartitions,
+                                                                         
restoreConsumer,
+                                                                         true,
+                                                                         
stateDirectory,
+                                                                         
Collections.<String, String>emptyMap());
+
+        stateMgr.register(persistentStore, true, 
persistentStore.stateRestoreCallback);
+
+        stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 
987L));
+
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 629e521..ef4ebcc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -54,6 +55,8 @@ import java.util.Properties;
 import java.util.Set;
 
 import static java.util.Collections.singleton;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -316,7 +319,7 @@ public class StandbyTaskTest {
         final String changelogName = "test-application-my-store-changelog";
         final List<TopicPartition> partitions = Utils.mkList(new 
TopicPartition(changelogName, 0));
         consumer.assign(partitions);
-        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
         committedOffsets.put(new TopicPartition(changelogName, 0), new 
OffsetAndMetadata(0L));
         consumer.commitSync(committedOffsets);
 
@@ -327,9 +330,53 @@ public class StandbyTaskTest {
         final ProcessorTopology topology = 
builder.setApplicationId(applicationId).build(0);
         StreamsConfig config = createConfig(baseDir);
         new StandbyTask(taskId, applicationId, partitions, topology, consumer, 
restoreStateConsumer, config,
-            new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        new MockStreamsMetrics(new Metrics()), stateDirectory);
 
     }
+
+    @Test
+    public void shouldCheckpointStoreOffsetsOnCommit() throws Exception {
+        consumer.assign(Utils.mkList(ktable));
+        final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new 
HashMap<>();
+        committedOffsets.put(new TopicPartition(ktable.topic(), 
ktable.partition()), new OffsetAndMetadata(100L));
+        consumer.commitSync(committedOffsets);
+
+        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
+                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], 
new Node[0])));
+
+        final TaskId taskId = new TaskId(0, 0);
+        final MockTime time = new MockTime();
+        final StreamsConfig config = createConfig(baseDir);
+        final StandbyTask task = new StandbyTask(taskId,
+                                                 applicationId,
+                                                 ktablePartitions,
+                                                 ktableTopology,
+                                                 consumer,
+                                                 restoreStateConsumer,
+                                                 config,
+                                                 null,
+                                                 stateDirectory
+        );
+
+
+        restoreStateConsumer.assign(new 
ArrayList<>(task.changeLogPartitions()));
+
+        final byte[] serializedValue = 
Serdes.Integer().serializer().serialize("", 1);
+        task.update(ktable, Collections.singletonList(new 
ConsumerRecord<>(ktable.topic(),
+                                                                           
ktable.partition(),
+                                                                           50L,
+                                                                           
serializedValue,
+                                                                           
serializedValue)));
+
+        time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+        task.commit();
+
+        final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(taskId),
+                                                                               
    ProcessorStateManager.CHECKPOINT_FILE_NAME)).read();
+        assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L)));
+
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> 
records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index 3f48059..f4aca9f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -57,7 +57,12 @@ public class StateManagerStub implements StateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return null;
     }
+
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 15b1d25..5c72fc9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
@@ -39,6 +41,8 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
@@ -415,6 +419,62 @@ public class StreamTaskTest {
         assertTrue(flushed.get());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCheckpointOffsetsOnCommit() throws Exception {
+        final String storeName = "test";
+        final String changelogTopic = 
ProcessorStateManager.storeChangelogTopic("appId", storeName);
+        final InMemoryKeyValueStore inMemoryStore = new 
InMemoryKeyValueStore(storeName, null, null) {
+            @Override
+            public void init(final ProcessorContext context, final StateStore 
root) {
+                context.register(root, true, null);
+            }
+
+            @Override
+            public boolean persistent() {
+                return true;
+            }
+        };
+        final ProcessorTopology topology = new 
ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                                                 
Collections.<String, SourceNode>emptyMap(),
+                                                                 
Collections.<String, SinkNode>emptyMap(),
+                                                                 
Collections.<StateStore>singletonList(inMemoryStore),
+                                                                 
Collections.singletonMap(storeName, changelogTopic),
+                                                                 
Collections.<StateStore>emptyList());
+
+        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
+        final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
+            @Override
+            public Map<TopicPartition, Long> offsets() {
+
+                return Collections.singletonMap(partition, 543L);
+            }
+        };
+
+        restoreStateConsumer.updatePartitions(changelogTopic,
+                                              Collections.singletonList(
+                                                      new 
PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
+        
restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
+        
restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 
0L));
+
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new 
Metrics());
+        final TaskId taskId = new TaskId(0, 0);
+        final MockTime time = new MockTime();
+        final StreamsConfig config = createConfig(baseDir);
+        final StreamTask streamTask = new StreamTask(taskId, "appId", 
partitions, topology, consumer,
+                                                     restoreStateConsumer, 
config, streamsMetrics,
+                                                     stateDirectory, new 
ThreadCache("testCache", 0, streamsMetrics),
+                                                     time, recordCollector);
+
+        time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+
+        streamTask.commit();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(taskId),
+                                                                          
ProcessorStateManager.CHECKPOINT_FILE_NAME));
+
+        assertThat(checkpoint.read(), 
equalTo(Collections.singletonMap(partition, 544L)));
+    }
+
     @Test
     public void 
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() 
throws Exception {
         ((ProcessorContextImpl) 
task.processorContext()).setCurrentNode(processor);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java 
b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index 2f3ef26..612a0da 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -67,6 +67,11 @@ public class GlobalStateManagerStub implements 
GlobalStateManager {
     }
 
     @Override
+    public void checkpoint(final Map<TopicPartition, Long> offsets) {
+        this.offsets.putAll(offsets);
+    }
+
+    @Override
     public StateStore getGlobalStore(final String name) {
         return null;
     }
@@ -77,7 +82,7 @@ public class GlobalStateManagerStub implements 
GlobalStateManager {
     }
 
     @Override
-    public Map<TopicPartition, Long> checkpointedOffsets() {
+    public Map<TopicPartition, Long> checkpointed() {
         return offsets;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f1e794a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index b50ff34..ac8933d 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -211,7 +211,8 @@ public class ProcessorTopologyTestDriver {
             final GlobalStateManagerImpl stateManager = new 
GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new 
GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
-                                                        stateManager);
+                                                        stateManager
+            );
             globalStateTask.initialize();
         }
 

Reply via email to