cadonna commented on code in PR #14001: URL: https://github.com/apache/kafka/pull/14001#discussion_r1285936281
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -87,11 +88,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); } else { + boolean progressed = false; + + // First, attempt to punctuate Review Comment: nit: I do not think we need this comment. It is clear from the code that we first punctuate. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -87,11 +88,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); } else { + boolean progressed = false; + + // First, attempt to punctuate + if (taskExecutionMetadata.canPunctuateTask(currentTask)) { + if (currentTask.maybePunctuateStreamTime()) { + log.trace("punctuated stream time for task {} ", currentTask.id()); + progressed = true; + } + if (currentTask.maybePunctuateSystemTime()) { + log.trace("punctuated system time for task {} ", currentTask.id()); + progressed = true; + } + } Review Comment: Is there a specific reason we punctuate before we process. In the current code path we first process and then punctuate. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -87,11 +88,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); } else { + boolean progressed = false; + + // First, attempt to punctuate + if (taskExecutionMetadata.canPunctuateTask(currentTask)) { + if (currentTask.maybePunctuateStreamTime()) { + log.trace("punctuated stream time for task {} ", currentTask.id()); + progressed = true; + } + if (currentTask.maybePunctuateSystemTime()) { + log.trace("punctuated system time for task {} ", currentTask.id()); + progressed = true; + } + } + // if a task is no longer processable, ask task-manager to give it another // task in the next iteration Review Comment: This comment does not match the code it should describe. Can we please remove it? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -87,11 +88,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); } else { Review Comment: Why can we not process a newly assigned task immediately but we wait until the next iteration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org