vvcephei commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485767592
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } + private void initializeAndRestorePhase() { + { + // only try to initialize the assigned tasks + // if the state is still in PARTITION_ASSIGNED after the poll call + final State stateSnapshot = state; + if (stateSnapshot == State.PARTITIONS_ASSIGNED + || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { + + log.debug("State is {}; initializing and restoring", stateSnapshot); + + // transit to restore active is idempotent so we can call it multiple times + changelogReader.enforceRestoreActive(); + + if (taskManager.tryToCompleteRestoration()) { + changelogReader.transitToUpdateStandby(); + + setState(State.RUNNING); + } + + if (log.isDebugEnabled()) { + log.debug("Initialization and restore call done. State is {}", state); + } + } + } + + log.debug("Invoking ChangeLogReader#restore"); Review comment: Thanks, all. The purpose is simply to make the narrative of StreamThread's debug logs unambiguous and complete. The purpose of logging it here instead of relying only on ChangeLogReader's logs is that you can enable _just_ StreamThread's debug logger and get a complete high-level view of what's happening. If you want to then drill down into the restore call itself, you could enable ChangeLogReader's debug log. Good point, @wcarlson5 , about logging the state. I'll add it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org