nicktelford commented on code in PR #15137: URL: https://github.com/apache/kafka/pull/15137#discussion_r1443141089
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ########## @@ -266,29 +269,20 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.info("Initializing to the starting offset for changelog {} of in-memory state store {}", store.changelogPartition, store.stateStore.name()); } else if (store.offset() == null) { - if (loadedCheckpoints.containsKey(store.changelogPartition)) { - final Long offset = changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition)); - store.setOffset(offset); - log.info("State store {} initialized from checkpoint with offset {} at changelog {}", - store.stateStore.name(), store.offset, store.changelogPartition); - } else { - // with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file - // and hence we are uncertain that the current local state only contains committed data; - // in that case we need to treat it as a task-corrupted exception - if (eosEnabled && !storeDirIsEmpty) { - log.warn("State store {} did not find checkpoint offsets while stores are not empty, " + - "since under EOS it has the risk of getting uncommitted data in stores we have to " + - "treat it as a task corruption error and wipe out the local state of task {} " + - "before re-bootstrapping", store.stateStore.name(), taskId); - - throw new TaskCorruptedException(Collections.singleton(taskId)); - } else { - log.info("State store {} did not find checkpoint offset, hence would " + - "default to the starting offset at changelog {}", - store.stateStore.name(), store.changelogPartition); - } - } + // load managed offsets from store + Long offset = loadOffsetFromStore(store); + + // load offsets from .checkpoint file + offset = loadOffsetFromCheckpointFile(offset, loadedCheckpoints, store); + + // no offsets found for store, store is corrupt if not empty + throwCorruptIfNoOffsetAndNotEmpty(offset, storeDirIsEmpty); + + updateOffsetInMemory(offset, store); + + syncManagedOffsetInStore(offset, store); + Review Comment: I generally tried to avoid unnecessary changes/refactoring wherever possible, but the new logic here caused checkstyle to complain about NPath complexity, so I factored it out to several `private` methods. In the end, breaking this up has made it considerably easier to follow, and is now somewhat self-documenting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org