cadonna commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1285936281


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -87,11 +88,29 @@ private void runOnce(final long nowMs) {
             if (currentTask == null) {
                 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
             } else {
+                boolean progressed = false;
+
+                // First, attempt to punctuate

Review Comment:
   nit: I do not think we need this comment. It is clear from the code that we 
first punctuate.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -87,11 +88,29 @@ private void runOnce(final long nowMs) {
             if (currentTask == null) {
                 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
             } else {
+                boolean progressed = false;
+
+                // First, attempt to punctuate
+                if (taskExecutionMetadata.canPunctuateTask(currentTask)) {
+                    if (currentTask.maybePunctuateStreamTime()) {
+                        log.trace("punctuated stream time for task {} ", 
currentTask.id());
+                        progressed = true;
+                    }
+                    if (currentTask.maybePunctuateSystemTime()) {
+                        log.trace("punctuated system time for task {} ", 
currentTask.id());
+                        progressed = true;
+                    }
+                }

Review Comment:
   Is there a specific reason we punctuate before we process. In the current 
code path we first process and then punctuate.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -87,11 +88,29 @@ private void runOnce(final long nowMs) {
             if (currentTask == null) {
                 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
             } else {
+                boolean progressed = false;
+
+                // First, attempt to punctuate
+                if (taskExecutionMetadata.canPunctuateTask(currentTask)) {
+                    if (currentTask.maybePunctuateStreamTime()) {
+                        log.trace("punctuated stream time for task {} ", 
currentTask.id());
+                        progressed = true;
+                    }
+                    if (currentTask.maybePunctuateSystemTime()) {
+                        log.trace("punctuated system time for task {} ", 
currentTask.id());
+                        progressed = true;
+                    }
+                }
+
                 // if a task is no longer processable, ask task-manager to 
give it another
                 // task in the next iteration

Review Comment:
   This comment does not match the code it should describe. Can we please 
remove it?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -87,11 +88,29 @@ private void runOnce(final long nowMs) {
             if (currentTask == null) {
                 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
             } else {

Review Comment:
   Why can we not process a newly assigned task immediately but we wait until 
the next iteration?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to