[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
vvcephei commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485771434 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -706,13 +662,17 @@ void runOnce() { totalProcessed += processed; } +log.debug("TaskManager#process handled {} records; invoking TaskManager#punctuate", processed); + final int punctuated = taskManager.punctuate(); final long punctuateLatency = advanceNowAndComputeLatency(); totalPunctuateLatency += punctuateLatency; if (punctuated > 0) { punctuateSensor.record(punctuateLatency / (double) punctuated, now); } +log.debug("TaskManager#punctuate executed: {}", punctuated); Review comment: Note, this isn't saying that we invoked a single punctuator, but that we invoked _all_ the punctuators that are runnable right now. It's also a top-level phase of executing StreamThread, so I don't think we can leave it out of the debug logs without telling an incomplete story of what's happening. If we log this (or any of the proposed logs) at trace level, the only consequence is that users who want to _debug_ StreamThread would have to use "trace" level instead of "debug" level. This seems to add unnecessary complexity to the logging. IMO, it's better to reserve "trace" level for _very_ low-level logs, such as logging the progress of individual documents through the processors, and use "debug" level for higher-level summary logs like the ones in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
vvcephei commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485767592 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } +private void initializeAndRestorePhase() { +{ +// only try to initialize the assigned tasks +// if the state is still in PARTITION_ASSIGNED after the poll call +final State stateSnapshot = state; +if (stateSnapshot == State.PARTITIONS_ASSIGNED +|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { + +log.debug("State is {}; initializing and restoring", stateSnapshot); + +// transit to restore active is idempotent so we can call it multiple times +changelogReader.enforceRestoreActive(); + +if (taskManager.tryToCompleteRestoration()) { +changelogReader.transitToUpdateStandby(); + +setState(State.RUNNING); +} + +if (log.isDebugEnabled()) { +log.debug("Initialization and restore call done. State is {}", state); +} +} +} + +log.debug("Invoking ChangeLogReader#restore"); Review comment: Thanks, all. The purpose is simply to make the narrative of StreamThread's debug logs unambiguous and complete. The purpose of logging it here instead of relying only on ChangeLogReader's logs is that you can enable _just_ StreamThread's debug logger and get a complete high-level view of what's happening. If you want to then drill down into the restore call itself, you could enable ChangeLogReader's debug log. Good point, @wcarlson5 , about logging the state. I'll add it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
vvcephei commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485759565 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -689,6 +644,7 @@ void runOnce() { * 6. Otherwise, increment N. */ do { +log.debug("Invoking TaskManager#process with {} iterations.", numIterations); Review comment: Yes, it's for debugging purposes, so I'd like to keep them both at debug level. When I was handling the incident that inspired this PR, what I really wanted to see in the logs is both how often we called poll and which operation in the loop was blocking. In theory, I'd only really need the "before" or the "after" logs for this, but only if I also have the source code pulled up to compare with the log messages. I'm proposing to (redundantly) log both "before" and "after" messages so that the logs will be context-free and people in the future would be able to tell what is happening just from the narrative of the logs themselves. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread
vvcephei commented on a change in pull request #9267: URL: https://github.com/apache/kafka/pull/9267#discussion_r485062295 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -612,63 +612,18 @@ void runOnce() { final long startMs = time.milliseconds(); now = startMs; -if (state == State.PARTITIONS_ASSIGNED) { -// try to fetch some records with zero poll millis -// to unblock the restoration as soon as possible -records = pollRequests(Duration.ZERO); -} else if (state == State.PARTITIONS_REVOKED) { -// try to fetch som records with zero poll millis to unblock -// other useful work while waiting for the join response -records = pollRequests(Duration.ZERO); -} else if (state == State.RUNNING || state == State.STARTING) { -// try to fetch some records with normal poll time -// in order to get long polling -records = pollRequests(pollTime); -} else if (state == State.PENDING_SHUTDOWN) { -// we are only here because there's rebalance in progress, -// just poll with zero to complete it -records = pollRequests(Duration.ZERO); -} else { -// any other state should not happen -log.error("Unexpected state {} during normal iteration", state); -throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); -} - -final long pollLatency = advanceNowAndComputeLatency(); - -pollSensor.record(pollLatency, now); -if (records != null && !records.isEmpty()) { -pollRecordsSensor.record(records.count(), now); -taskManager.addRecordsToTasks(records); -} +final long pollLatency = pollPhase(); Review comment: `runOnce` was too long, according to checkStyle, so I factored out some of the execution phases. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -612,63 +612,18 @@ void runOnce() { final long startMs = time.milliseconds(); now = startMs; -if (state == State.PARTITIONS_ASSIGNED) { -// try to fetch some records with zero poll millis -// to unblock the restoration as soon as possible -records = pollRequests(Duration.ZERO); -} else if (state == State.PARTITIONS_REVOKED) { -// try to fetch som records with zero poll millis to unblock -// other useful work while waiting for the join response -records = pollRequests(Duration.ZERO); -} else if (state == State.RUNNING || state == State.STARTING) { -// try to fetch some records with normal poll time -// in order to get long polling -records = pollRequests(pollTime); -} else if (state == State.PENDING_SHUTDOWN) { -// we are only here because there's rebalance in progress, -// just poll with zero to complete it -records = pollRequests(Duration.ZERO); -} else { -// any other state should not happen -log.error("Unexpected state {} during normal iteration", state); -throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); -} - -final long pollLatency = advanceNowAndComputeLatency(); - -pollSensor.record(pollLatency, now); -if (records != null && !records.isEmpty()) { -pollRecordsSensor.record(records.count(), now); -taskManager.addRecordsToTasks(records); -} +final long pollLatency = pollPhase(); // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation // could affect the task manager state beyond this point within #runOnce(). if (!isRunning()) { -log.debug("State already transits to {}, skipping the run once call after poll request", state); +log.debug("Thread state is already {}, skipping the run once call after poll request", state); Review comment: Just a slight rewording I thought could be clearer. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -752,6 +712,77 @@ void runOnce() { commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); } +private void initializeAndRestorePhase() { +{ +// only try to initialize the assigned tasks +