vvcephei commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r568069417



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -111,13 +112,16 @@ public void initialize() {
             final List<PartitionInfo> partitions;
             try {
                 partitions = streamsProducer.partitionsFor(topic);
-            } catch (final KafkaException e) {
-                // TODO: KIP-572 need to handle `TimeoutException`
-                // -> should we throw a `TaskCorruptedException` for this case 
to reset the task and retry (including triggering `task.timeout.ms`) ?
+            } catch (final TimeoutException timeoutException) {
+                log.debug("Could not get partitions for topic {}, will retry", 
topic);
+
+                // re-throw to trigger `task.timeout.ms`
+                throw timeoutException;
+            } catch (final KafkaException fatal) {
                 // here we cannot drop the message on the floor even if it is 
a transient timeout exception,
                 // so we treat everything the same as a fatal exception
                 throw new StreamsException("Could not determine the number of 
partitions for topic '" + topic +
-                    "' for task " + taskId + " due to " + e.toString());
+                    "' for task " + taskId + " due to " + fatal.toString());

Review comment:
       Can we also include the cause when we throw exceptions? It's not always 
helpful, but it has been invaluable for debugging many times since we started 
to include the cause.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1109,22 +1109,32 @@ int process(final int maxNumRecords, final Time time) {
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
+            int processed = 0;
+            final long then = now;
             try {
-                int processed = 0;
-                final long then = now;
                 while (processed < maxNumRecords && task.process(now)) {
+                    task.clearTaskTimeout();
                     processed++;
                 }
-                now = time.milliseconds();
-                totalProcessed += processed;
-                task.recordProcessBatchTime(now - then);
+            } catch (final TimeoutException timeoutException) {

Review comment:
       It looks like it's only here to handle the case of ALOS. On the 
contrary, if we just crash in that case, we don't need this block here at all, 
right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;

Review comment:
       It seems like KIP-572 should retry only when it's safe to do so. Under 
ALOS, it's not safe to retry, so we should just crash here, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;

Review comment:
       If I'm reading the code right, this would also mean we don't need to 
mess with caching the StampedRecord, which would be nice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to