mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r568275164



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1109,22 +1109,32 @@ int process(final int maxNumRecords, final Time time) {
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
+            int processed = 0;
+            final long then = now;
             try {
-                int processed = 0;
-                final long then = now;
                 while (processed < maxNumRecords && task.process(now)) {
+                    task.clearTaskTimeout();
                     processed++;
                 }
-                now = time.milliseconds();
-                totalProcessed += processed;
-                task.recordProcessBatchTime(now - then);
+            } catch (final TimeoutException timeoutException) {

Review comment:
       > vs the one about (in Task)?
   
   Not sure what you mean. Can you elaborate?
   
   > It looks like it's only here to handle the case of ALOS. On the contrary, 
if we just crash in that case, we don't need this block here at all, right?
   
   Yes, it's only to at-least-once -- for exactly-once we need to roll-back the 
state stores -- we cannot just retry. -- And the goal is to _avoid_ to just 
crash... :D 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to