vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r786893179



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -94,7 +94,8 @@
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
     void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
+                  final StateRestoreCallback stateRestoreCallback,
+                  final CheckpointCallback checkpointCallback);

Review comment:
       We need to avoid breaking changes, so what we'll want to do here is 
deprecate this method and introduce a new overload with a default 
implementation that calls the old method (ignoring the `checkpointCallback`). 
That way, existing store implementations will continue to compile after 
upgrading.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -152,4 +152,10 @@ default void init(final StateStoreContext context, final 
StateStore root) {
         // If a store doesn't implement a query handler, then all queries are 
unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
+
+    /*
+    default void checkpoint() throws IOException {
+    }
+
+     */

Review comment:
       ```suggestion
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -598,6 +605,15 @@ public void checkpoint() {
         // checkpoint those stores that are only logged and persistent to the 
checkpoint file
         final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
         for (final StateStoreMetadata storeMetadata : stores.values()) {
+            if (storeMetadata.checkpointCallback != null) {
+                try {
+                    storeMetadata.checkpointCallback.checkpoint();
+                } catch (final IOException e) {
+                    throw new ProcessorStateException(format("%sError creating 
position checkpoint file",
+                            logPrefix), e);
+                }
+            }
+

Review comment:
       I think we'll need to add `if (!corrupted)`.
   
   I agree we don't need to move this into the block below because 
non-persistent stores already know that they're not persistent and therefore 
can safely ignore the callback. But stores won't automatically know whether 
they are corrupted or not, so we should avoid checkpointing them if they're 
corrupted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to