vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452423901
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean
storeDirIsEmpty) {
log.debug("State store {} initialized from checkpoint
with offset {} at changelog {}",
store.stateStore.name(), store.offset,
store.changelogPartition);
} else {
- // with EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
+ // With EOS, if the previous run did not shutdown
gracefully, we may lost the checkpoint file
// and hence we are uncertain that the current local
state only contains committed data;
// in that case we need to treat it as a
task-corrupted exception
- if (eosEnabled && !storeDirIsEmpty) {
+
+ // Note, this is a little overzealous, since we aren't
checking whether the store's specific
+ // directory is nonempty, only if there are any
directories for any stores. So if there are
+ // two stores in a task, and one is correctly written
and checkpointed, while the other is
+ // neither written nor checkpointed, we _could_
correctly load the first and recover the second
+ // but instead we'll consider the whole task corrupted
and discard the first and recover both.
Review comment:
IIRC, @guozhangwang tried to include this during the February refactor,
but it's harder to get right than it sounds. Still, it would be very nice to
have it, and I agree it would be good to file a ticket.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]