vvcephei commented on a change in pull request #9997: URL: https://github.com/apache/kafka/pull/9997#discussion_r568069417
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ########## @@ -111,13 +112,16 @@ public void initialize() { final List<PartitionInfo> partitions; try { partitions = streamsProducer.partitionsFor(topic); - } catch (final KafkaException e) { - // TODO: KIP-572 need to handle `TimeoutException` - // -> should we throw a `TaskCorruptedException` for this case to reset the task and retry (including triggering `task.timeout.ms`) ? + } catch (final TimeoutException timeoutException) { + log.debug("Could not get partitions for topic {}, will retry", topic); + + // re-throw to trigger `task.timeout.ms` + throw timeoutException; + } catch (final KafkaException fatal) { // here we cannot drop the message on the floor even if it is a transient timeout exception, // so we treat everything the same as a fatal exception throw new StreamsException("Could not determine the number of partitions for topic '" + topic + - "' for task " + taskId + " due to " + e.toString()); + "' for task " + taskId + " due to " + fatal.toString()); Review comment: Can we also include the cause when we throw exceptions? It's not always helpful, but it has been invaluable for debugging many times since we started to include the cause. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1109,22 +1109,32 @@ int process(final int maxNumRecords, final Time time) { long now = time.milliseconds(); for (final Task task : activeTaskIterable()) { + int processed = 0; + final long then = now; try { - int processed = 0; - final long then = now; while (processed < maxNumRecords && task.process(now)) { + task.clearTaskTimeout(); processed++; } - now = time.milliseconds(); - totalProcessed += processed; - task.recordProcessBatchTime(now - then); + } catch (final TimeoutException timeoutException) { Review comment: It looks like it's only here to handle the case of ALOS. On the contrary, if we just crash in that case, we don't need this block here at all, right? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) { if (recordInfo.queue().size() == maxBufferedSize) { mainConsumer.resume(singleton(partition)); } - } catch (final StreamsException e) { - throw e; + + record = null; + } catch (final TimeoutException timeoutException) { + if (!eosEnabled) { + throw timeoutException; Review comment: It seems like KIP-572 should retry only when it's safe to do so. Under ALOS, it's not safe to retry, so we should just crash here, right? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) { if (recordInfo.queue().size() == maxBufferedSize) { mainConsumer.resume(singleton(partition)); } - } catch (final StreamsException e) { - throw e; + + record = null; + } catch (final TimeoutException timeoutException) { + if (!eosEnabled) { + throw timeoutException; Review comment: If I'm reading the code right, this would also mean we don't need to mess with caching the StampedRecord, which would be nice. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org