[ https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tomonari Yamashita updated KAFKA-15108: --------------------------------------- Description: [Problem] - task.timeout.ms does not work when TimeoutException is thrown by streams producer -- Kafka Streams upgrade guide says, "Kafka Streams is now handling TimeoutException thrown by the consumer, producer, and admin client."(1) and "To bound how long Kafka Streams retries a task, you can set task.timeout.ms (default is 5 minutes)."(1). -- However, it doesn't look like task.timeout.ms is working for the streams producer, then it seems to keep retrying forever. [Environment] - Kafka Streams 3.5.0 [Reproduce procedure] # Create "input-topic" topic # Put several messages on "input-topic" # DONT create "output-topic" topic, to fire TimeoutException # Create the following simple Kafka streams program; this program just transfers messages from "input-topic" to "output-topic". -- {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler"); // not needed StreamsBuilder builder = new StreamsBuilder(); builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), props); {code} # Wait for task.timeout.ms (default is 5 minutes). ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not exist. ## And every one minute, TimeoutException will be generated (2) # ==> However, it doesn't look like task.timeout.ms is working for the streams producer, then it seems to keep retrying forever. ## My excepted behavior is that task.timeout.ms is working, and the client will be shutdown because the default behavior is StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown. [As far as my investigation] - TimeoutException thrown by the streams producer is replaced with TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3) - And after that it does not appear to be executing code that contains logic related to task.timeout.ms. (1) Kafka Streams upgrade guide - [https://kafka.apache.org/35/documentation/streams/upgrade-guide] - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] {code:java} Kafka Streams is now handling TimeoutException thrown by the consumer, producer, and admin client. If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed task in the next iteration. To bound how long Kafka Streams retries a task, you can set task.timeout.ms (default is 5 minutes). If a task does not make progress within the specified task timeout, which is tracked on a per-task basis, Kafka Streams throws a TimeoutException (cf. KIP-572). {code} (2) TimeoutException occurs {code:java} 2023-06-19 19:51:26 WARN NetworkClient:1145 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Error while fetching metadata with correlation id 1065 : {output-topic=UNKNOWN_TOPIC_OR_PARTITION} 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Requesting metadata update for topic output-topic due to error UNKNOWN_TOPIC_OR_PARTITION 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Updated cluster metadata updateVersion 1064 to MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094 (id: 0 rack: null), 1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094 (id: 1 rack: null), 2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 (id: 2 rack: null)}, partitions=[], controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 (id: 2 rack: null)} 2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Exception occurred during message send: org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present in metadata after 60000 ms. 2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Error encountered sending record to topic output-topic for task 0_0 due to: org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present in metadata after 60000 ms. The broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present in metadata after 60000 ms. 2023-06-19 19:51:26 DEBUG StreamThread:825 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Processed 1 records with 1 iterations; invoking punctuators if necessary 2023-06-19 19:51:26 DEBUG StreamThread:837 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 0 punctuators ran. 2023-06-19 19:51:26 DEBUG StreamThread:1117 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Committing all active tasks [0_0] and standby tasks [] since 60021ms has elapsed (commit interval is 30000ms) 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Flushing record collector 2023-06-19 19:51:26 WARN StreamThread:626 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Detected the states of tasks [0_0] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_0] are corrupted and hence need to be re-initialized at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:310) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077) ~[kafka-clients-3.5.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) ~[kafka-clients-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:261) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) [kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) [kafka-streams-3.5.0.jar:?] 2023-06-19 19:51:26 DEBUG TaskExecutor:176 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Committing task offsets {} 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Flushing record collector 2023-06-19 19:51:26 DEBUG StreamTask:419 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Prepared RUNNING task for committing 2023-06-19 19:51:26 INFO StreamTask:1235 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Suspended from RUNNING 2023-06-19 19:51:26 DEBUG StreamTask:898 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Checkpointable offsets {input-topic-0=0} 2023-06-19 19:51:26 DEBUG ProcessorStateManager:658 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Writing checkpoint: {} for task 0_0 2023-06-19 19:51:26 DEBUG StreamTask:504 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Finalized commit for SUSPENDED task with enforce checkpoint true 2023-06-19 19:51:26 DEBUG ProcessorStateManager:544 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Closing its state manager and all the registered state stores: {} 2023-06-19 19:51:26 INFO RecordCollectorImpl:373 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Closing record collector dirty 2023-06-19 19:51:26 INFO StreamTask:555 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Closed dirty {code} (3) TimeoutException thrown by the streams producer is replaced with TaskCorruptedException [https://github.com/apache/kafka/blob/c97b88d5db4de28d9f51bb11fb71ddd6217c7dda/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L310] was: [Problem] - task.timeout.ms does not work when TimeoutException is thrown by streams producer -- Kafka Streams upgrade guide says, "Kafka Streams is now handling TimeoutException thrown by the consumer, producer, and admin client."(1) and "To bound how long Kafka Streams retries a task, you can set task.timeout.ms (default is 5 minutes)."(1). -- However, it doesn't look like task.timeout.ms is working for the streams producer, then it seems to keep retrying forever. [Environment] - Kafka Streams 3.5.0 [Reproduce procedure] # Create "input-topic" topic # Put several messages on "input-topic" # DONT create "output-topic" topic, to fire TimeoutException # Create the following simple Kafka streams program; this program just transfers messages from "input-topic" to "output-topic". -- {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler"); // not needed StreamsBuilder builder = new StreamsBuilder(); builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), props); {code} # Wait for task.timeout.ms (default is 5 minutes). ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not exist. ## And every one minute, TimeoutException will be generated (2) # ==> However, it doesn't look like task.timeout.ms is working for the streams producer, then it seems to keep retrying forever. ## My excepted behavior is that task.timeout.ms is working, and the client will be shutdown because the default behavior is StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown. [As far as my investigation] - TimeoutException thrown by the streams producer is replaced with TaskCorruptedException (3) - And after that it does not appear to be executing code that contains logic related to task.timeout.ms. (1) Kafka Streams upgrade guide - [https://kafka.apache.org/35/documentation/streams/upgrade-guide] - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] {code:java} Kafka Streams is now handling TimeoutException thrown by the consumer, producer, and admin client. If a timeout occurs on a task, Kafka Streams moves to the next task and retries to make progress on the failed task in the next iteration. To bound how long Kafka Streams retries a task, you can set task.timeout.ms (default is 5 minutes). If a task does not make progress within the specified task timeout, which is tracked on a per-task basis, Kafka Streams throws a TimeoutException (cf. KIP-572). {code} (2) TimeoutException occurs {code:java} 2023-06-19 19:51:26 WARN NetworkClient:1145 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Error while fetching metadata with correlation id 1065 : {output-topic=UNKNOWN_TOPIC_OR_PARTITION} 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Requesting metadata update for topic output-topic due to error UNKNOWN_TOPIC_OR_PARTITION 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Updated cluster metadata updateVersion 1064 to MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094 (id: 0 rack: null), 1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094 (id: 1 rack: null), 2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 (id: 2 rack: null)}, partitions=[], controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 (id: 2 rack: null)} 2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] Exception occurred during message send: org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present in metadata after 60000 ms. 2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Error encountered sending record to topic output-topic for task 0_0 due to: org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present in metadata after 60000 ms. The broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present in metadata after 60000 ms. 2023-06-19 19:51:26 DEBUG StreamThread:825 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Processed 1 records with 1 iterations; invoking punctuators if necessary 2023-06-19 19:51:26 DEBUG StreamThread:837 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 0 punctuators ran. 2023-06-19 19:51:26 DEBUG StreamThread:1117 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Committing all active tasks [0_0] and standby tasks [] since 60021ms has elapsed (commit interval is 30000ms) 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Flushing record collector 2023-06-19 19:51:26 WARN StreamThread:626 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Detected the states of tasks [0_0] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_0] are corrupted and hence need to be re-initialized at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:310) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077) ~[kafka-clients-3.5.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) ~[kafka-clients-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:261) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) [kafka-streams-3.5.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) [kafka-streams-3.5.0.jar:?] 2023-06-19 19:51:26 DEBUG TaskExecutor:176 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] Committing task offsets {} 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Flushing record collector 2023-06-19 19:51:26 DEBUG StreamTask:419 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Prepared RUNNING task for committing 2023-06-19 19:51:26 INFO StreamTask:1235 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Suspended from RUNNING 2023-06-19 19:51:26 DEBUG StreamTask:898 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Checkpointable offsets {input-topic-0=0} 2023-06-19 19:51:26 DEBUG ProcessorStateManager:658 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Writing checkpoint: {} for task 0_0 2023-06-19 19:51:26 DEBUG StreamTask:504 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Finalized commit for SUSPENDED task with enforce checkpoint true 2023-06-19 19:51:26 DEBUG ProcessorStateManager:544 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Closing its state manager and all the registered state stores: {} 2023-06-19 19:51:26 INFO RecordCollectorImpl:373 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] stream-task [0_0] Closing record collector dirty 2023-06-19 19:51:26 INFO StreamTask:555 - stream-thread [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task [0_0] Closed dirty {code} (3) TimeoutException thrown by the streams producer is replaced with TaskCorruptedException [https://github.com/apache/kafka/blob/c97b88d5db4de28d9f51bb11fb71ddd6217c7dda/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L310] > task.timeout.ms does not work when TimeoutException is thrown by streams > producer > --------------------------------------------------------------------------------- > > Key: KAFKA-15108 > URL: https://issues.apache.org/jira/browse/KAFKA-15108 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.0 > Reporter: Tomonari Yamashita > Priority: Major > > [Problem] > - task.timeout.ms does not work when TimeoutException is thrown by streams > producer > -- Kafka Streams upgrade guide says, "Kafka Streams is now handling > TimeoutException thrown by the consumer, producer, and admin client."(1) and > "To bound how long Kafka Streams retries a task, you can set task.timeout.ms > (default is 5 minutes)."(1). > -- However, it doesn't look like task.timeout.ms is working for the streams > producer, then it seems to keep retrying forever. > [Environment] > - Kafka Streams 3.5.0 > [Reproduce procedure] > # Create "input-topic" topic > # Put several messages on "input-topic" > # DONT create "output-topic" topic, to fire TimeoutException > # Create the following simple Kafka streams program; this program just > transfers messages from "input-topic" to "output-topic". > -- > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler"); > // not needed > StreamsBuilder builder = new StreamsBuilder(); > builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) > .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > {code} > # Wait for task.timeout.ms (default is 5 minutes). > ## If the debug log is enabled, a large number of > UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not > exist. > ## And every one minute, TimeoutException will be generated (2) > # ==> However, it doesn't look like task.timeout.ms is working for the > streams producer, then it seems to keep retrying forever. > ## My excepted behavior is that task.timeout.ms is working, and the client > will be shutdown because the default behavior is > StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown. > [As far as my investigation] > - TimeoutException thrown by the streams producer is replaced with > TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3) > - And after that it does not appear to be executing code that contains logic > related to task.timeout.ms. > (1) Kafka Streams upgrade guide > - [https://kafka.apache.org/35/documentation/streams/upgrade-guide] > - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] > {code:java} > Kafka Streams is now handling TimeoutException thrown by the consumer, > producer, and admin client. If a timeout occurs on a task, Kafka Streams > moves to the next task and retries to make progress on the failed task in the > next iteration. To bound how long Kafka Streams retries a task, you can set > task.timeout.ms (default is 5 minutes). If a task does not make progress > within the specified task timeout, which is tracked on a per-task basis, > Kafka Streams throws a TimeoutException (cf. KIP-572). > {code} > (2) TimeoutException occurs > {code:java} > 2023-06-19 19:51:26 WARN NetworkClient:1145 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Error while fetching metadata with correlation id 1065 : > {output-topic=UNKNOWN_TOPIC_OR_PARTITION} > 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Requesting metadata update for topic output-topic due to error > UNKNOWN_TOPIC_OR_PARTITION > 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Updated cluster metadata updateVersion 1064 to > MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', > nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094 > (id: 0 rack: null), > 1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094 > (id: 1 rack: null), > 2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 > (id: 2 rack: null)}, partitions=[], > controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 > (id: 2 rack: null)} > 2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Exception occurred during message send: > org.apache.kafka.common.errors.TimeoutException: Topic output-topic not > present in metadata after 60000 ms. > 2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.TimeoutException: Topic output-topic not > present in metadata after 60000 ms. > The broker is either slow or in bad state (like not having enough replicas) > in responding the request, or the connection to broker was interrupted > sending the request or receiving the response. > Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger > value to wait longer for such scenarios and avoid timeout errors > org.apache.kafka.common.errors.TimeoutException: Topic output-topic not > present in metadata after 60000 ms. > 2023-06-19 19:51:26 DEBUG StreamThread:825 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Processed 1 records with 1 iterations; invoking punctuators if necessary > 2023-06-19 19:51:26 DEBUG StreamThread:837 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 0 > punctuators ran. > 2023-06-19 19:51:26 DEBUG StreamThread:1117 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Committing all active tasks [0_0] and standby tasks [] since 60021ms has > elapsed (commit interval is 30000ms) > 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Flushing record collector > 2023-06-19 19:51:26 WARN StreamThread:626 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Detected the states of tasks [0_0] are corrupted. Will close the task as > dirty and re-create and bootstrap from scratch. > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_0] are > corrupted and hence need to be re-initialized > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:310) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077) > ~[kafka-clients-3.5.0.jar:?] > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) > ~[kafka-clients-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:261) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > [kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > [kafka-streams-3.5.0.jar:?] > 2023-06-19 19:51:26 DEBUG TaskExecutor:176 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Committing task offsets {} > 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Flushing record collector > 2023-06-19 19:51:26 DEBUG StreamTask:419 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Prepared RUNNING task for committing > 2023-06-19 19:51:26 INFO StreamTask:1235 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Suspended from RUNNING > 2023-06-19 19:51:26 DEBUG StreamTask:898 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Checkpointable offsets {input-topic-0=0} > 2023-06-19 19:51:26 DEBUG ProcessorStateManager:658 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Writing checkpoint: {} for task 0_0 > 2023-06-19 19:51:26 DEBUG StreamTask:504 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Finalized commit for SUSPENDED task with enforce checkpoint true > 2023-06-19 19:51:26 DEBUG ProcessorStateManager:544 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Closing its state manager and all the registered state > stores: {} > 2023-06-19 19:51:26 INFO RecordCollectorImpl:373 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Closing record collector dirty > 2023-06-19 19:51:26 INFO StreamTask:555 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Closed dirty > {code} > (3) TimeoutException thrown by the streams producer is replaced with > TaskCorruptedException > > [https://github.com/apache/kafka/blob/c97b88d5db4de28d9f51bb11fb71ddd6217c7dda/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L310] -- This message was sent by Atlassian Jira (v8.20.10#820010)