mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452408268
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { - // with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file + // With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception - if (eosEnabled && !storeDirIsEmpty) { + + // Note, this is a little overzealous, since we aren't checking whether the store's specific + // directory is nonempty, only if there are any directories for any stores. So if there are + // two stores in a task, and one is correctly written and checkpointed, while the other is + // neither written nor checkpointed, we _could_ correctly load the first and recover the second + // but instead we'll consider the whole task corrupted and discard the first and recover both. Review comment: Sound like something we should fix. Can you file a ticket? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org