mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r568272373



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;
+            } else {
+                record = null;
+                throw new TaskCorruptedException(Collections.singletonMap(id, 
changelogPartitions()));

Review comment:
       Not sure if starting the timeout at all makes sense for this case?
   
   Also, starting it when we transit to RUNNING seems to be a non-trivial 
change.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -111,13 +112,16 @@ public void initialize() {
             final List<PartitionInfo> partitions;
             try {
                 partitions = streamsProducer.partitionsFor(topic);
-            } catch (final KafkaException e) {
-                // TODO: KIP-572 need to handle `TimeoutException`
-                // -> should we throw a `TaskCorruptedException` for this case 
to reset the task and retry (including triggering `task.timeout.ms`) ?
+            } catch (final TimeoutException timeoutException) {
+                log.debug("Could not get partitions for topic {}, will retry", 
topic);

Review comment:
       INFO does not make sense I guess. So WARN it it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;

Review comment:
       Why would it not be safe to retry the commit for at-least-once? -- The 
indention of the PR was to retry for this case.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;

Review comment:
       Why would it not be safe to retry the commit for at-least-once? -- The 
indention of the PR was to retry for this case. -- It may lead to duplication, 
but that seems fine with at-least-once.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1109,22 +1109,32 @@ int process(final int maxNumRecords, final Time time) {
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
+            int processed = 0;
+            final long then = now;
             try {
-                int processed = 0;
-                final long then = now;
                 while (processed < maxNumRecords && task.process(now)) {
+                    task.clearTaskTimeout();
                     processed++;
                 }
-                now = time.milliseconds();
-                totalProcessed += processed;
-                task.recordProcessBatchTime(now - then);
+            } catch (final TimeoutException timeoutException) {

Review comment:
       > vs the one about (in Task)?
   
   Not sure what you mean. Can you elaborate?
   
   > It looks like it's only here to handle the case of ALOS. On the contrary, 
if we just crash in that case, we don't need this block here at all, right?
   
   Yes, it's only to at-least-once -- for exactly-once we need to roll-back the 
state stores -- we cannot just retry. -- And the goal is to _avoid_ to just 
crash... :D 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to