mjsax commented on code in PR #20693:
URL: https://github.com/apache/kafka/pull/20693#discussion_r2496674581


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,25 +729,49 @@ public boolean isProcessable(final long wallClockTime) {
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime.isEmpty();
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, "Task is not ready to process: 
has pending transaction commit");
+            }
             return false;
         }
-        final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
-        if (!readyToProcess) {
-            if (timeCurrentIdlingStarted.isEmpty()) {
+
+        final AbstractPartitionGroup.ReadyToProcessResult readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+        if (!readyToProcess.isReady()) {
+            if (wasReady) {
+                // READY -> NOT_READY - start the timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
                 timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, 
readyToProcess.getLogMessage().orElse("Task is not ready to process"));
             }
         } else {
+            // Task is ready - clear the timer
+            lastNotReadyLogTime = Optional.empty();
             timeCurrentIdlingStarted = Optional.empty();
+            log.trace("Task is ready to process");
+        }
+        return readyToProcess.isReady();
+    }
+
+    private void maybeLogNotReady(final long wallClockTime, final String 
logMessage) {
+        // NOT_READY - check if it should log
+        final long timeSinceLastLog = lastNotReadyLogTime.map(aLong -> 
wallClockTime - aLong).orElse(0L);

Review Comment:
   ```suggestion
           final long timeSinceLastLog = lastNotReadyLogTime.map(aLong -> 
wallClockTime - aLong).orElse(-1L);
   ```
   We never logged, so `-1L` is maybe a little be more expressive -- does not 
really change much; just seems to be a tiny bit cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to