nicktelford commented on code in PR #15137:
URL: https://github.com/apache/kafka/pull/15137#discussion_r1443141089


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -266,29 +269,20 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
                     log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
                              store.changelogPartition, 
store.stateStore.name());
                 } else if (store.offset() == null) {
-                    if 
(loadedCheckpoints.containsKey(store.changelogPartition)) {
-                        final Long offset = 
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
-                        store.setOffset(offset);
 
-                        log.info("State store {} initialized from checkpoint 
with offset {} at changelog {}",
-                                  store.stateStore.name(), store.offset, 
store.changelogPartition);
-                    } else {
-                        // with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
-                        // and hence we are uncertain that the current local 
state only contains committed data;
-                        // in that case we need to treat it as a 
task-corrupted exception
-                        if (eosEnabled && !storeDirIsEmpty) {
-                            log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
-                                "since under EOS it has the risk of getting 
uncommitted data in stores we have to " +
-                                "treat it as a task corruption error and wipe 
out the local state of task {} " +
-                                "before re-bootstrapping", 
store.stateStore.name(), taskId);
-
-                            throw new 
TaskCorruptedException(Collections.singleton(taskId));
-                        } else {
-                            log.info("State store {} did not find checkpoint 
offset, hence would " +
-                                "default to the starting offset at changelog 
{}",
-                                store.stateStore.name(), 
store.changelogPartition);
-                        }
-                    }
+                    // load managed offsets from store
+                    Long offset = loadOffsetFromStore(store);
+
+                    // load offsets from .checkpoint file
+                    offset = loadOffsetFromCheckpointFile(offset, 
loadedCheckpoints, store);
+
+                    // no offsets found for store, store is corrupt if not 
empty
+                    throwCorruptIfNoOffsetAndNotEmpty(offset, storeDirIsEmpty);
+
+                    updateOffsetInMemory(offset, store);
+
+                    syncManagedOffsetInStore(offset, store);
+

Review Comment:
   I generally tried to avoid unnecessary changes/refactoring wherever 
possible, but the new logic here caused checkstyle to complain about NPath 
complexity, so I factored it out to several `private` methods.
   
   In the end, breaking this up has made it considerably easier to follow, and 
is now somewhat self-documenting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to