vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r789167413



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/CheckpointCallback.java
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import java.io.IOException;
+
+public interface CheckpointCallback {

Review comment:
       We'll need good JavaDoc on all public interfaces before we can merge.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -94,7 +94,8 @@
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
     void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
+                  final StateRestoreCallback stateRestoreCallback,
+                  final CheckpointCallback checkpointCallback);

Review comment:
       Just making sure this doesn't get missed, though we already discussed it 
on the ProcessorContext... This is a breaking change, so what we should 
actually do is deprecate the current `register(store, stateRestoreCallback)` 
and add a new `register(store, stateRestoreCallback, checkpointCallback)`.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
##########
@@ -34,7 +35,7 @@ public File baseDir() {
 
     @Override
     public void registerStore(final StateStore store,
-                              final StateRestoreCallback stateRestoreCallback) 
{}
+                              final StateRestoreCallback stateRestoreCallback, 
final CheckpointCallback checkpoint) {}

Review comment:
       Also in the interest of formatting:
   > Use a "single parameter per line" formatting when defining methods (also 
for methods with only 2 parameters).
   https://kafka.apache.org/coding-guide.html

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -85,8 +85,23 @@
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
+    default void register(final StateStore store,

Review comment:
       Just to be clear, the ProcessorContext itself is not meant to be 
implemented by users, so we don't need to add a default implementation 
(although we can, and it'll make it easier for us to maintain our 
ProcessorContext implementations).
   
   However, we should deprecate this method and document that callers should 
migrate to the new one.
   
   One other thing I noticed is that we really should be updating 
StateStoreContext, not ProcessorContext. I think we should actually just 
deprecate `ProcessorContext#register` and tell people to use 
`StateStoreContext` for stores instead. Then we don't need to add your new 
overload here, just in StateStoreContext, though we do have the same 
compatibility concern there.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -228,7 +228,7 @@ private void init(final StateStore root) {
             streamsMetrics
         );
 
-        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch);
+        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch, () -> { });

Review comment:
       Oh, I guess, even more importantly, this is an in-memory store, so it 
shouldn't write the position on checkpoint.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -228,7 +228,7 @@ private void init(final StateStore root) {
             streamsMetrics
         );
 
-        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch);
+        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch, () -> { });

Review comment:
       This is ok because the buffer never serves queries, right? It might be 
worth an explanatory comment, in case that assumption becomes untrue in the 
future.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -109,20 +113,29 @@ public void init(final ProcessorContext context, final 
StateStore root) {
             expiredRecordSensor = null;
         }
 
+        this.positionCheckpointFile = new File(context.stateDir(), this.name() 
+ ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
         if (root != null) {
-            context.register(root, (key, value) -> 
put(SessionKeySchema.from(Bytes.wrap(key)), value));
+            context.register(root, (key, value) -> 
put(SessionKeySchema.from(Bytes.wrap(key)), value), this::checkpoint);
         }
         open = true;
     }
 
+    public void checkpoint() throws IOException {
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);

Review comment:
       InMemory stores shouldn't do this, right?
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
##########
@@ -44,9 +43,12 @@
     public void init(final StateStoreContext context, final StateStore root) {
         super.init(context, root);
         this.stateStoreContext = context;
+
+        this.position = super.getPosition();

Review comment:
       Should we be checkpointing the position in this store as well?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -152,4 +153,8 @@ default void init(final StateStoreContext context, final 
StateStore root) {
         // If a store doesn't implement a query handler, then all queries are 
unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
+
+    default Position getPosition() {
+        return Position.emptyPosition();

Review comment:
       I wonder if it's better to throw an UnsupportedOperationException or 
something. It seems risky that we (or anyone) could forget to override the 
method, but still call it and not notice that we're getting wrong results.
   
   We do want to prevent compiler errors when people upgrade (so the default 
implementation is necessary), but I'm not sure "emptyPosition" is a safe 
default.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -109,21 +114,30 @@ public void init(final ProcessorContext context, final 
StateStore root) {
             metrics
         );
 
+        this.positionCheckpointFile = new File(context.stateDir(), this.name() 
+ ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
         if (root != null) {
             context.register(root, (key, value) ->
-                put(Bytes.wrap(extractStoreKeyBytes(key)), value, 
extractStoreTimestamp(key)));
+                put(Bytes.wrap(extractStoreKeyBytes(key)), value, 
extractStoreTimestamp(key)), this::checkpoint);
         }
         open = true;
     }
 
+    public void checkpoint() throws IOException {
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);

Review comment:
       InMemory stores shouldn't do this, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -598,6 +605,15 @@ public void checkpoint() {
         // checkpoint those stores that are only logged and persistent to the 
checkpoint file
         final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
         for (final StateStoreMetadata storeMetadata : stores.values()) {
+            if (storeMetadata.checkpointCallback != null && 
!storeMetadata.corrupted) {
+                try {
+                    storeMetadata.checkpointCallback.checkpoint();
+                } catch (final IOException e) {
+                    throw new ProcessorStateException(format("%sError creating 
position checkpoint file",

Review comment:
       ```suggestion
                       throw new ProcessorStateException(format("%sError 
checkpointing store",
   ```
   Also, it might be nice to include the store name and partition in the error 
message.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
##########
@@ -82,26 +86,37 @@ public String name() {
     @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
+        this.positionCheckpointFile = new File(context.stateDir(), this.name() 
+ ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
 
         // register the store
         context.register(root, (key, value) -> {
             restoring = true;
             put(Bytes.wrap(key), value);
             restoring = false;
-        });
+        }, () -> { });
     }
 
     @Override
     public void init(final StateStoreContext context, final StateStore root) {
         // register the store
+        this.positionCheckpointFile = new File(context.stateDir(), this.name() 
+ ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
         context.register(root, (key, value) -> {
             restoring = true;
             put(Bytes.wrap(key), value);
             restoring = false;
-        });
+        }, null);
         this.context = context;
     }
 
+    public void checkpoint() throws IOException {
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);

Review comment:
       InMemory stores shouldn't do this, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -152,4 +153,8 @@ default void init(final StateStoreContext context, final 
StateStore root) {
         // If a store doesn't implement a query handler, then all queries are 
unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
+
+    default Position getPosition() {

Review comment:
       We'll need JavaDoc here as well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -45,10 +45,11 @@
 
     private final String name;
     private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
-    private final Position position = Position.emptyPosition();
     private volatile boolean open = false;
     private StateStoreContext context;
 
+    private Position position = Position.emptyPosition();

Review comment:
       I think this was a victim of refactoring... It can still be final, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -333,6 +339,43 @@ public static boolean isPermitted(
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+
+    public static Map<TopicPartition, Long> positionToTopicPartitionMap(final 
Position position) {

Review comment:
       This can be private, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -49,6 +49,7 @@ public void init(final ProcessorContext context,
                      final StateStore root) {
         this.context = asInternalProcessorContext(context);
         super.init(context, root);
+        this.position = super.getPosition();

Review comment:
       Interesting... How is this different from just `getPosition()`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
##########
@@ -52,10 +52,12 @@
     @Override
     public void init(final StateStoreContext context, final StateStore root) {
         super.init(context, root);
+        this.position = super.getPosition();

Review comment:
       Likewise here, it seems like we should be checkpointing this position?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -333,6 +339,43 @@ public static boolean isPermitted(
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+
+    public static Map<TopicPartition, Long> positionToTopicPartitionMap(final 
Position position) {
+        final Map<TopicPartition, Long> topicPartitions = new HashMap<>();
+        final Set<String> topics = position.getTopics();
+        for (final String t : topics) {
+            final Map<Integer, Long> partitions = 
position.getPartitionPositions(t);
+            for (final Entry<Integer, Long> e : partitions.entrySet()) {
+                final TopicPartition tp = new TopicPartition(t, e.getKey());
+                topicPartitions.put(tp, e.getValue());
+            }
+        }
+        return topicPartitions;
+    }
+
+    public static void checkpointPosition(final OffsetCheckpoint 
checkpointFile, final Position position) throws IOException {
+        final Map<TopicPartition, Long> topicPartitions = 
StoreQueryUtils.positionToTopicPartitionMap(position);
+        checkpointFile.write(topicPartitions);
+    }
+
+    public static Position readPositionFromCheckpoint(final OffsetCheckpoint 
checkpointFile) {
+        try {
+            final Map<TopicPartition, Long> topicPartitions = 
checkpointFile.read();
+            return 
StoreQueryUtils.topicPartitionMapToPosition(topicPartitions);
+        } catch (final IOException e) {
+            throw new ProcessorStateException("Error reading checkpoint file", 
e);
+        }
+    }
+
+    public static Position topicPartitionMapToPosition(final 
Map<TopicPartition, Long> topicPartitions) {

Review comment:
       It seems like this can also be private.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -117,6 +120,11 @@
     private OffsetCheckpoint checkpoint;
     private StateDirectory stateDirectory;
 
+    private Position persistentPosition;
+    private Position nonPersistentPosition;
+    private StateStorePositionCheckpoint persistentCheckpoint;
+    private StateStorePositionCheckpoint nonPersistentCheckpoint;

Review comment:
       I think this might be a relic... Non-Persistent stores won't write a 
checkpoint, right?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -1033,6 +1048,101 @@ public void 
shouldNotDeleteCheckPointFileIfEosNotEnabled() throws IOException {
         assertTrue(checkpointFile.exists());
     }
 
+    @Test
+    public void shouldWritePositionCheckpointFile() throws IOException {
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+        try {
+            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, persistentCheckpoint);
+            stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, nonPersistentCheckpoint);
+        } finally {
+            stateMgr.checkpoint();
+
+            assertTrue(persistentCheckpoint.getFile().exists());
+            assertTrue(nonPersistentCheckpoint.getFile().exists());
+
+            // the checkpoint file should contain an offset from the 
persistent store only.
+            final Map<TopicPartition, Long> persistentOffsets = 
persistentCheckpoint.getOffsetCheckpoint().read();
+            assertThat(persistentOffsets, is(singletonMap(new 
TopicPartition(persistentStoreTopicName, 1), 123L)));
+
+            final Map<TopicPartition, Long> nonPersistentOffsets = 
nonPersistentCheckpoint.getOffsetCheckpoint().read();
+            assertThat(nonPersistentOffsets, is(singletonMap(new 
TopicPartition(nonPersistentStoreTopicName, 1), 123L)));
+
+            assertEquals(persistentCheckpoint.getCheckpointedPosition(), 
persistentCheckpoint.getStateStorePosition());
+            assertEquals(nonPersistentCheckpoint.getCheckpointedPosition(), 
nonPersistentCheckpoint.getStateStorePosition());
+
+            stateMgr.close();
+
+            assertTrue(persistentStore.closed);
+            assertTrue(nonPersistentStore.closed);
+        }
+    }
+
+    @Test
+    public void shouldFailWritingPositionCheckpointFile() throws IOException {

Review comment:
       It doesn't look like this is what the test does, is it?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/PositionCheckpointIntegrationTest.java
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class PositionCheckpointIntegrationTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PositionCheckpointIntegrationTest.class);
+    private static final long SEED = new Random().nextLong();
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+    private static final long RECORD_TIME = System.currentTimeMillis();
+    private static final long WINDOW_START =
+        (RECORD_TIME / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis();
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+    private final String kind;
+    private final boolean cache;
+    private final boolean log;
+    private KafkaStreams kafkaStreams;
+    private Properties streamsConfig;
+
+    public enum StoresToTest {
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, 
Duration.ofDays(1), WINDOW_SIZE,
+                                                    false
+                );
+            }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, 
Duration.ofDays(1),
+                                                               WINDOW_SIZE, 
false
+                );
+            }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, 
Duration.ofDays(1));
+            }
+
+            @Override
+            public boolean isSession() {
+                return true;
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean timestamped() {
+            return true; // most stores are timestamped
+        }
+
+        public boolean global() {
+            return false;
+        }
+
+        public boolean keyValue() {
+            return false;
+        }
+
+        public boolean isWindowed() {
+            return false;
+        }
+
+        public boolean isSession() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}, 
kind={3}")
+    public static Collection<Object[]> data() {
+        LOG.info("Generating test cases according to random seed: {}", SEED);
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(false, true)) {
+            for (final boolean logEnabled : Arrays.asList(true)) {

Review comment:
       Thanks. Why is that?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -333,6 +339,43 @@ public static boolean isPermitted(
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+
+    public static Map<TopicPartition, Long> positionToTopicPartitionMap(final 
Position position) {
+        final Map<TopicPartition, Long> topicPartitions = new HashMap<>();
+        final Set<String> topics = position.getTopics();
+        for (final String t : topics) {
+            final Map<Integer, Long> partitions = 
position.getPartitionPositions(t);
+            for (final Entry<Integer, Long> e : partitions.entrySet()) {
+                final TopicPartition tp = new TopicPartition(t, e.getKey());
+                topicPartitions.put(tp, e.getValue());
+            }
+        }
+        return topicPartitions;
+    }
+
+    public static void checkpointPosition(final OffsetCheckpoint 
checkpointFile, final Position position) throws IOException {
+        final Map<TopicPartition, Long> topicPartitions = 
StoreQueryUtils.positionToTopicPartitionMap(position);
+        checkpointFile.write(topicPartitions);
+    }
+
+    public static Position readPositionFromCheckpoint(final OffsetCheckpoint 
checkpointFile) {
+        try {
+            final Map<TopicPartition, Long> topicPartitions = 
checkpointFile.read();
+            return 
StoreQueryUtils.topicPartitionMapToPosition(topicPartitions);
+        } catch (final IOException e) {

Review comment:
       The write and read paths here handle IOExceptions differently. It would 
be good for them both to just throw the IOException up to the caller or for 
them both to catch it and throw a ProcessorStateException.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##########
@@ -310,8 +310,8 @@ public void 
shouldConvertValuesIfInnerStoreImplementsTimestampedBytesStore() {
         stateManager.registerStore(
             new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, 
Object>(store2) {
             },
-            stateRestoreCallback
-        );
+            stateRestoreCallback,
+                null);

Review comment:
       It looks like the formatting got messed up here (and elsewhere). It's a 
minor issues, but we might as well fix it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -249,10 +254,15 @@ public void init(final ProcessorContext context,
         // open the DB dir
         metricsRecorder.init(getMetricsImpl(context), context.taskId());
         openDB(context.appConfigs(), context.stateDir());
+        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
 
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch);
+        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch, this::checkpoint);

Review comment:
       Sorry; one more formatting correction. This applies in several places in 
this PR:
   
   > If a method call is longer than 120 characters, switch to a single 
parameter per line formatting (instead of just breaking it into two lines only).
   
   https://kafka.apache.org/coding-guide.html

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/PositionCheckpointIntegrationTest.java
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class PositionCheckpointIntegrationTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PositionCheckpointIntegrationTest.class);
+    private static final long SEED = new Random().nextLong();
+    private static final int NUM_BROKERS = 1;
+    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private static final String STORE_NAME = "kv-store";
+    private static final long RECORD_TIME = System.currentTimeMillis();
+    private static final long WINDOW_START =
+        (RECORD_TIME / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis();
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final StoresToTest storeToTest;
+    private final String kind;
+    private final boolean cache;
+    private final boolean log;
+    private KafkaStreams kafkaStreams;
+    private Properties streamsConfig;
+
+    public enum StoresToTest {
+        ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
+        },
+        TIME_ROCKS_KV {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
+            }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
+        },
+        ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentWindowStore(STORE_NAME, 
Duration.ofDays(1), WINDOW_SIZE,
+                                                    false
+                );
+            }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
+        },
+        TIME_ROCKS_WINDOW {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, 
Duration.ofDays(1),
+                                                               WINDOW_SIZE, 
false
+                );
+            }
+
+            @Override
+            public boolean isWindowed() {
+                return true;
+            }
+        },
+        ROCKS_SESSION {
+            @Override
+            public StoreSupplier<?> supplier() {
+                return Stores.persistentSessionStore(STORE_NAME, 
Duration.ofDays(1));
+            }
+
+            @Override
+            public boolean isSession() {
+                return true;
+            }
+        };
+
+        public abstract StoreSupplier<?> supplier();
+
+        public boolean timestamped() {
+            return true; // most stores are timestamped
+        }
+
+        public boolean global() {
+            return false;
+        }
+
+        public boolean keyValue() {
+            return false;
+        }
+
+        public boolean isWindowed() {
+            return false;
+        }
+
+        public boolean isSession() {
+            return false;
+        }
+    }
+
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}, 
kind={3}")
+    public static Collection<Object[]> data() {
+        LOG.info("Generating test cases according to random seed: {}", SEED);
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean cacheEnabled : Arrays.asList(false, true)) {
+            for (final boolean logEnabled : Arrays.asList(true)) {
+                for (final StoresToTest toTest : StoresToTest.values()) {
+                    for (final String kind : Arrays.asList("DSL", "PAPI")) {
+                        values.add(new Object[]{cacheEnabled, logEnabled, 
toTest.name(), kind});
+                    }
+                }
+            }
+        }
+        return values;
+    }
+
+    public PositionCheckpointIntegrationTest(
+        final boolean cache,
+        final boolean log,
+        final String storeToTest,
+        final String kind) {
+        this.cache = cache;
+        this.log = log;
+        this.storeToTest = StoresToTest.valueOf(storeToTest);
+        this.kind = kind;
+        this.streamsConfig = streamsConfiguration(
+                cache,
+                log,
+                storeToTest,
+                kind
+        );
+    }
+
+    @BeforeClass
+    public static void before()
+        throws InterruptedException, IOException, ExecutionException, 
TimeoutException {
+
+        CLUSTER.start();
+        CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
+        final int partitions = 2;
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+
+        final List<Future<RecordMetadata>> futures = new LinkedList<>();
+        try (final Producer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
+            for (int i = 0; i < 4; i++) {
+                final Future<RecordMetadata> send = producer.send(
+                    new ProducerRecord<>(
+                        INPUT_TOPIC_NAME,
+                        i % partitions,
+                        RECORD_TIME,
+                        i,
+                        i,
+                        null
+                    )
+                );
+                futures.add(send);
+                Time.SYSTEM.sleep(1L);
+            }
+            producer.flush();
+
+            for (final Future<RecordMetadata> future : futures) {
+                final RecordMetadata recordMetadata = future.get(1, 
TimeUnit.MINUTES);
+                assertThat(recordMetadata.hasOffset(), is(true));
+                INPUT_POSITION.withComponent(
+                    recordMetadata.topic(),
+                    recordMetadata.partition(),
+                    recordMetadata.offset()
+                );
+            }
+        }
+
+        assertThat(INPUT_POSITION, equalTo(
+            Position
+                .emptyPosition()
+                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 1L)
+        ));
+    }
+
+    @Before
+    public void beforeTest() {
+        beforeTest(true);
+    }
+
+    public void beforeTest(final boolean cleanup) {
+        final StoreSupplier<?> supplier = storeToTest.supplier();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        if (Objects.equals(kind, "DSL") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, 
builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, 
builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof 
WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, 
builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof 
WindowBytesStoreSupplier) {
+            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, 
builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof 
SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, 
builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof 
SessionBytesStoreSupplier) {
+            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, 
builder);
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized 
type.");
+        }
+
+        kafkaStreams =
+                IntegrationTestUtils.getStartedStreams(
+                        streamsConfig,
+                        builder,
+                        cleanup
+                );
+    }
+
+    @After
+    public void afterTest() {
+        afterTest(true);
+    }
+
+    public void afterTest(final boolean cleanup) {
+        // only needed because some of the PAPI cases aren't added yet.
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+            if (cleanup) {
+                kafkaStreams.cleanUp();
+            }
+        }
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    public void reboot() {
+        afterTest(false);
+        beforeTest(false);
+    }
+
+    @Test
+    public void verifyStore() throws Exception {
+        if (storeToTest.keyValue()) {
+            shouldCheckpointKeyValueStore(2);
+        }
+
+        if (storeToTest.isWindowed()) {
+            shouldHandleWindowKeyQueries();
+        }
+
+        if (storeToTest.isSession()) {
+            shouldHandleSessionKeyQueries();
+        }
+    }
+    public <V> void shouldCheckpointKeyValueStore(
+            final Integer key) throws Exception {
+        shouldHandleKeyQuery(key);
+        reboot();
+        shouldHandleKeyQuery(key);
+    }
+
+    private <T> void shouldHandleWindowKeyQueries() {
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START)
+        );
+        reboot();
+        shouldHandleWindowKeyQuery(
+                2,
+                Instant.ofEpochMilli(WINDOW_START),
+                Instant.ofEpochMilli(WINDOW_START)
+        );
+    }
+
+    private <T> void shouldHandleSessionKeyQueries() {
+        shouldHandleSessionRangeQuery(
+            2,
+            mkSet(2)
+        );
+        reboot();
+        shouldHandleSessionRangeQuery(
+                2,
+                mkSet(2)
+        );
+    }
+
+    public <V> void shouldHandleKeyQuery(
+            final Integer key) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+                inStore(STORE_NAME)
+                        .withQuery(query)
+                        .withPartitions(mkSet(0, 1))
+                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<V> queryResult = result.getOnlyPartitionResult();
+        final boolean failure = queryResult.isFailure();
+        if (failure) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+    }
+
+    public <V> void shouldHandleWindowKeyQuery(
+        final Integer key,
+        final Instant timeFrom,
+        final Instant timeTo) {
+
+        final WindowKeyQuery<Integer, V> query = 
WindowKeyQuery.withKeyAndWindowStartRange(
+            key,
+            timeFrom,
+            timeTo
+        );
+
+        final StateQueryRequest<WindowStoreIterator<V>> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<WindowStoreIterator<V>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        }
+    }

Review comment:
       huh, that's weird. It seems like it should, especially since that's also 
the bound, so the query shouldn't succeed at all unless it's at that position.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to