mjsax commented on code in PR #20693:
URL: https://github.com/apache/kafka/pull/20693#discussion_r2496670474


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,25 +729,49 @@ public boolean isProcessable(final long wallClockTime) {
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime.isEmpty();
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, "Task is not ready to process: 
has pending transaction commit");
+            }
             return false;
         }
-        final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
-        if (!readyToProcess) {
-            if (timeCurrentIdlingStarted.isEmpty()) {
+
+        final AbstractPartitionGroup.ReadyToProcessResult readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+        if (!readyToProcess.isReady()) {
+            if (wasReady) {
+                // READY -> NOT_READY - start the timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
                 timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, 
readyToProcess.getLogMessage().orElse("Task is not ready to process"));

Review Comment:
   For this case, we should always have a log message, right? Wondering if we 
should use `orElseThrow` to surface a potential bug directly?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,25 +729,49 @@ public boolean isProcessable(final long wallClockTime) {
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime.isEmpty();
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, "Task is not ready to process: 
has pending transaction commit");
+            }
             return false;
         }
-        final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
-        if (!readyToProcess) {
-            if (timeCurrentIdlingStarted.isEmpty()) {
+
+        final AbstractPartitionGroup.ReadyToProcessResult readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+        if (!readyToProcess.isReady()) {
+            if (wasReady) {
+                // READY -> NOT_READY - start the timer
+                lastNotReadyLogTime = Optional.of(wallClockTime);
                 timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            } else {
+                maybeLogNotReady(wallClockTime, 
readyToProcess.getLogMessage().orElse("Task is not ready to process"));

Review Comment:
   For this case, we should always have a log message, right? Wondering if we 
should use `orElseThrow` to surface a potential bug in our code directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to