[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-09 Thread GitBox


vvcephei commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485771434



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -706,13 +662,17 @@ void runOnce() {
 totalProcessed += processed;
 }
 
+log.debug("TaskManager#process handled {} records; invoking 
TaskManager#punctuate", processed);
+
 final int punctuated = taskManager.punctuate();
 final long punctuateLatency = advanceNowAndComputeLatency();
 totalPunctuateLatency += punctuateLatency;
 if (punctuated > 0) {
 punctuateSensor.record(punctuateLatency / (double) 
punctuated, now);
 }
 
+log.debug("TaskManager#punctuate executed: {}", punctuated);

Review comment:
   Note, this isn't saying that we invoked a single punctuator, but that we 
invoked _all_ the punctuators that are runnable right now. It's also a 
top-level phase of executing StreamThread, so I don't think we can leave it out 
of the debug logs without telling an incomplete story of what's happening. If 
we log this (or any of the proposed logs) at trace level, the only consequence 
is that users who want to _debug_ StreamThread would have to use "trace" level 
instead of "debug" level. This seems to add unnecessary complexity to the 
logging.
   
   IMO, it's better to reserve "trace" level for _very_ low-level logs, such as 
logging the progress of individual documents through the processors, and use 
"debug" level for higher-level summary logs like the ones in this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-09 Thread GitBox


vvcephei commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485767592



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -752,6 +712,77 @@ void runOnce() {
 commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 }
 
+private void initializeAndRestorePhase() {
+{
+// only try to initialize the assigned tasks
+// if the state is still in PARTITION_ASSIGNED after the poll call
+final State stateSnapshot = state;
+if (stateSnapshot == State.PARTITIONS_ASSIGNED
+|| stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+
+log.debug("State is {}; initializing and restoring", 
stateSnapshot);
+
+// transit to restore active is idempotent so we can call it 
multiple times
+changelogReader.enforceRestoreActive();
+
+if (taskManager.tryToCompleteRestoration()) {
+changelogReader.transitToUpdateStandby();
+
+setState(State.RUNNING);
+}
+
+if (log.isDebugEnabled()) {
+log.debug("Initialization and restore call done. State is 
{}", state);
+}
+}
+}
+
+log.debug("Invoking ChangeLogReader#restore");

Review comment:
   Thanks, all. The purpose is simply to make the narrative of 
StreamThread's debug logs unambiguous and complete. The purpose of logging it 
here instead of relying only on ChangeLogReader's logs is that you can enable 
_just_ StreamThread's debug logger and get a complete high-level view of what's 
happening. If you want to then drill down into the restore call itself, you 
could enable ChangeLogReader's debug log.
   
   Good point, @wcarlson5 , about logging the state. I'll add it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-09 Thread GitBox


vvcephei commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485759565



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -689,6 +644,7 @@ void runOnce() {
  *  6. Otherwise, increment N.
  */
 do {
+log.debug("Invoking TaskManager#process with {} iterations.", 
numIterations);

Review comment:
   Yes, it's for debugging purposes, so I'd like to keep them both at debug 
level. When I was handling the incident that inspired this PR, what I really 
wanted to see in the logs is both how often we called poll and which operation 
in the loop was blocking. In theory, I'd only really need the "before" or the 
"after" logs for this, but only if I also have the source code pulled up to 
compare with the log messages. I'm proposing to (redundantly) log both "before" 
and "after" messages so that the logs will be context-free and people in the 
future would be able to tell what is happening just from the narrative of the 
logs themselves.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9267: MINOR: Add debug logs for StreamThread

2020-09-08 Thread GitBox


vvcephei commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485062295



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -612,63 +612,18 @@ void runOnce() {
 final long startMs = time.milliseconds();
 now = startMs;
 
-if (state == State.PARTITIONS_ASSIGNED) {
-// try to fetch some records with zero poll millis
-// to unblock the restoration as soon as possible
-records = pollRequests(Duration.ZERO);
-} else if (state == State.PARTITIONS_REVOKED) {
-// try to fetch som records with zero poll millis to unblock
-// other useful work while waiting for the join response
-records = pollRequests(Duration.ZERO);
-} else if (state == State.RUNNING || state == State.STARTING) {
-// try to fetch some records with normal poll time
-// in order to get long polling
-records = pollRequests(pollTime);
-} else if (state == State.PENDING_SHUTDOWN) {
-// we are only here because there's rebalance in progress,
-// just poll with zero to complete it
-records = pollRequests(Duration.ZERO);
-} else {
-// any other state should not happen
-log.error("Unexpected state {} during normal iteration", state);
-throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
-}
-
-final long pollLatency = advanceNowAndComputeLatency();
-
-pollSensor.record(pollLatency, now);
-if (records != null && !records.isEmpty()) {
-pollRecordsSensor.record(records.count(), now);
-taskManager.addRecordsToTasks(records);
-}
+final long pollLatency = pollPhase();

Review comment:
   `runOnce` was too long, according to checkStyle, so I factored out some 
of the execution phases.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -612,63 +612,18 @@ void runOnce() {
 final long startMs = time.milliseconds();
 now = startMs;
 
-if (state == State.PARTITIONS_ASSIGNED) {
-// try to fetch some records with zero poll millis
-// to unblock the restoration as soon as possible
-records = pollRequests(Duration.ZERO);
-} else if (state == State.PARTITIONS_REVOKED) {
-// try to fetch som records with zero poll millis to unblock
-// other useful work while waiting for the join response
-records = pollRequests(Duration.ZERO);
-} else if (state == State.RUNNING || state == State.STARTING) {
-// try to fetch some records with normal poll time
-// in order to get long polling
-records = pollRequests(pollTime);
-} else if (state == State.PENDING_SHUTDOWN) {
-// we are only here because there's rebalance in progress,
-// just poll with zero to complete it
-records = pollRequests(Duration.ZERO);
-} else {
-// any other state should not happen
-log.error("Unexpected state {} during normal iteration", state);
-throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
-}
-
-final long pollLatency = advanceNowAndComputeLatency();
-
-pollSensor.record(pollLatency, now);
-if (records != null && !records.isEmpty()) {
-pollRecordsSensor.record(records.count(), now);
-taskManager.addRecordsToTasks(records);
-}
+final long pollLatency = pollPhase();
 
 // Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
 // The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().
 // Should only proceed when the thread is still running after 
#pollRequests(), because no external state mutation
 // could affect the task manager state beyond this point within 
#runOnce().
 if (!isRunning()) {
-log.debug("State already transits to {}, skipping the run once 
call after poll request", state);
+log.debug("Thread state is already {}, skipping the run once call 
after poll request", state);

Review comment:
   Just a slight rewording I thought could be clearer.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -752,6 +712,77 @@ void runOnce() {
 commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 }
 
+private void initializeAndRestorePhase() {
+{
+// only try to initialize the assigned tasks
+