guozhangwang commented on a change in pull request #9247:
URL: https://github.com/apache/kafka/pull/9247#discussion_r485941828



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -662,4 +662,10 @@ public TopicPartition 
registeredChangelogPartitionFor(final String storeName) {
     public String changelogFor(final String storeName) {
         return storeToChangelogTopic.get(storeName);
     }
+
+    public void deleteCheckPointFile() throws IOException {

Review comment:
       nit: I'd suggest we just inline this function inside `StreamTask` since 
1) this is only triggered with EOS enabled, and its name `deleteCheckPointFile` 
maybe a bit misleading, and 2) it is a very simply function anyways.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -332,6 +332,15 @@ public void resume() {
             case SUSPENDED:
                 // just transit the state without any logical changes: 
suspended and restoring states
                 // are not actually any different for inner modules
+
+                // Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
+                try {
+                    stateMgr.deleteCheckPointFile();
+                    log.debug("Deleted check point file");

Review comment:
       nit: `log.debug("Deleted check point file upon resuming with EOS 
enabled");`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to