vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452424595



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
                         log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
                                   store.stateStore.name(), store.offset, 
store.changelogPartition);
                     } else {
-                        // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+                        // With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
                         // and hence we are uncertain that the current local 
state only contains committed data;
                         // in that case we need to treat it as a 
task-corrupted exception
-                        if (eosEnabled && !storeDirIsEmpty) {
+
+                        // Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+                        // directory is nonempty, only if there are any 
directories for any stores. So if there are
+                        // two stores in a task, and one is correctly written 
and checkpointed, while the other is
+                        // neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+                        // but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+                        if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
       Oh, I put this comment on the wrong line:
   
   > FYI, this is also fixed (better) in #8996
   
   In other words, I agree, I like your fix better as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to