guozhangwang commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r790358840



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -111,12 +112,12 @@ public StreamsMetricsImpl metrics() {
 
     @Override
     public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback) {
+                         final StateRestoreCallback stateRestoreCallback, 
final CheckpointCallback checkpoint) {

Review comment:
       nit: let's use a newline for each parameter, or use a single line for 
all parameters if they are short.
   
   Also: I'd suggest we rename the parameter to `checkpointCallback`, to be 
more readable. 
   
   Ditto elsewhere.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -152,4 +153,11 @@ default void init(final StateStoreContext context, final 
StateStore root) {
         // If a store doesn't implement a query handler, then all queries are 
unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
+
+    /**
+     * This state store's position
+     */
+    default Position getPosition() {
+        throw new UnsupportedOperationException("Not implemented");

Review comment:
       Hmm.. do we need it to be a public API? I thought that with the 
callback, we do not need to expose it to public APIs (for getting the position 
from wrapped stores we can do that via internal functions), i.e. we only need 
to do either of them to achieve our goal?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -264,8 +267,12 @@ public void init(final ProcessorContext context,
 
         segments.openExisting(this.context, observedStreamTime);
 
+        this.positionCheckpointFile = new File(context.stateDir(), this.name() 
+ ".position");

Review comment:
       The logic of reading checkpoint upon init is duplicated on each of the 
classes, could we consolidate them further into `StoreQueryUtils`, e.g. by just 
passing in the stateDIR/stateName in it?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/CheckpointCallback.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import java.io.IOException;
+
+/**
+ * Callback for checkpointing position information in state stores.
+ */
+public interface CheckpointCallback {

Review comment:
       See my other comment: how about extending this callback to sth. like 
"CommitCallback"?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -85,8 +85,23 @@
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
+    default void register(final StateStore store,
+                  final StateRestoreCallback stateRestoreCallback) {
+        register(store, stateRestoreCallback, null);
+    }
+
+    /**
+     * Register and possibly restores the specified storage engine.
+     *
+     * @param store the storage engine
+     * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
+     * @param checkpointCallback called to checkpoint position metatadata of 
state stores
+     *
+     * @throws IllegalStateException If store gets registered after 
initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the 
partition
+     */
     void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
+                  final StateRestoreCallback stateRestoreCallback, final 
CheckpointCallback checkpointCallback);

Review comment:
       This is just for the KIP discussion itself: WYDT about generalizing the 
name of the callback to sth like `CommitCallback`, where the semantics is that 
the callback would be triggered whenever the corresponding task is committed. 
   
   My motivation is that:
   
   * Today the state store "checkpoint" is an internal implementation detail 
which is abstracted away from users. While "commit" is a public concept that 
users understand. It's triggering event is different from `StateStore#flush` 
since a state store would always be flushed when committing, but not vice versa.
   * In the future, we may or may not need to always persist the position 
metadata upon checkpointing the state stores.
   * But we may want to expose such a callback for other purposes, e.g. for 
in-memory stores checkpoint its data into disks along with the checkpoint file 
so we do not need to re-load from changelog, or for KS to checkpoint (not the 
same "checkpoint" as we used the term today, but to really checkpoint the state 
itself, sync or async) to remote storage etc.
   
   Just at the moment, we use this callback for writing position metadata; but 
keeping its name generalized allow us to modify the built-in store's logic 
without changing the public APIs.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -85,8 +85,23 @@
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
+    default void register(final StateStore store,
+                  final StateRestoreCallback stateRestoreCallback) {
+        register(store, stateRestoreCallback, null);
+    }
+
+    /**
+     * Register and possibly restores the specified storage engine.
+     *
+     * @param store the storage engine
+     * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
+     * @param checkpointCallback called to checkpoint position metatadata of 
state stores
+     *
+     * @throws IllegalStateException If store gets registered after 
initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the 
partition
+     */
     void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
+                  final StateRestoreCallback stateRestoreCallback, final 
CheckpointCallback checkpointCallback);

Review comment:
       nit: could we use a new line for the third parameter?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -333,6 +339,43 @@ public static boolean isPermitted(
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+
+    public static Map<TopicPartition, Long> positionToTopicPartitionMap(final 
Position position) {
+        final Map<TopicPartition, Long> topicPartitions = new HashMap<>();
+        final Set<String> topics = position.getTopics();
+        for (final String t : topics) {
+            final Map<Integer, Long> partitions = 
position.getPartitionPositions(t);
+            for (final Entry<Integer, Long> e : partitions.entrySet()) {
+                final TopicPartition tp = new TopicPartition(t, e.getKey());
+                topicPartitions.put(tp, e.getValue());
+            }
+        }
+        return topicPartitions;
+    }
+
+    public static void checkpointPosition(final OffsetCheckpoint 
checkpointFile, final Position position) throws IOException {
+        final Map<TopicPartition, Long> topicPartitions = 
StoreQueryUtils.positionToTopicPartitionMap(position);
+        checkpointFile.write(topicPartitions);
+    }
+
+    public static Position readPositionFromCheckpoint(final OffsetCheckpoint 
checkpointFile) {
+        try {
+            final Map<TopicPartition, Long> topicPartitions = 
checkpointFile.read();
+            return 
StoreQueryUtils.topicPartitionMapToPosition(topicPartitions);
+        } catch (final IOException e) {

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to