[ https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735625#comment-17735625 ]
Tomonari Yamashita commented on KAFKA-15108: -------------------------------------------- Hi [~mjsax], Thank you. I wasn't sure if this issume was unexpected behavior or not, so your advice was very helpful. > task.timeout.ms does not work when TimeoutException is thrown by streams > producer > --------------------------------------------------------------------------------- > > Key: KAFKA-15108 > URL: https://issues.apache.org/jira/browse/KAFKA-15108 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.0 > Reporter: Tomonari Yamashita > Priority: Major > > [Problem] > - task.timeout.ms does not work when TimeoutException is thrown by streams > producer > -- Kafka Streams upgrade guide says, "Kafka Streams is now handling > TimeoutException thrown by the consumer, producer, and admin client."(1) and > "To bound how long Kafka Streams retries a task, you can set task.timeout.ms > (default is 5 minutes)."(1). > -- However, it doesn't look like task.timeout.ms is working for the streams > producer, then it seems to keep retrying forever. > [Environment] > - Kafka Streams 3.5.0 > [Reproduction procedure] > # Create "input-topic" topic > # Put several messages on "input-topic" > # DONT create "output-topic" topic, to fire TimeoutException > # Create the following simple Kafka streams program; this program just > transfers messages from "input-topic" to "output-topic". > -- > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde"); > props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler"); > // not needed > StreamsBuilder builder = new StreamsBuilder(); > builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) > .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > {code} > # Wait for task.timeout.ms (default is 5 minutes). > ## If the debug log is enabled, a large number of > UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not > exist. > ## And every one minute, TimeoutException will be generated (2) > # ==> However, it doesn't look like task.timeout.ms is working for the > streams producer, then it seems to keep retrying forever. > ## My excepted behavior is that task.timeout.ms is working, and the client > will be shutdown because the default behavior is > StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown. > [As far as my investigation] > - TimeoutException thrown by the streams producer is replaced with > TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3) > - And after that it does not appear to be executing code that contains logic > related to task.timeout.ms. > (1) Kafka Streams upgrade guide > - [https://kafka.apache.org/35/documentation/streams/upgrade-guide] > - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams] > {code:java} > Kafka Streams is now handling TimeoutException thrown by the consumer, > producer, and admin client. If a timeout occurs on a task, Kafka Streams > moves to the next task and retries to make progress on the failed task in the > next iteration. To bound how long Kafka Streams retries a task, you can set > task.timeout.ms (default is 5 minutes). If a task does not make progress > within the specified task timeout, which is tracked on a per-task basis, > Kafka Streams throws a TimeoutException (cf. KIP-572). > {code} > (2) TimeoutException occurs > {code:java} > 2023-06-19 19:51:26 WARN NetworkClient:1145 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Error while fetching metadata with correlation id 1065 : > {output-topic=UNKNOWN_TOPIC_OR_PARTITION} > 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Requesting metadata update for topic output-topic due to error > UNKNOWN_TOPIC_OR_PARTITION > 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Updated cluster metadata updateVersion 1064 to > MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', > nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094 > (id: 0 rack: null), > 1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094 > (id: 1 rack: null), > 2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 > (id: 2 rack: null)}, partitions=[], > controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094 > (id: 2 rack: null)} > 2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer] > Exception occurred during message send: > org.apache.kafka.common.errors.TimeoutException: Topic output-topic not > present in metadata after 60000 ms. > 2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.TimeoutException: Topic output-topic not > present in metadata after 60000 ms. > The broker is either slow or in bad state (like not having enough replicas) > in responding the request, or the connection to broker was interrupted > sending the request or receiving the response. > Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger > value to wait longer for such scenarios and avoid timeout errors > org.apache.kafka.common.errors.TimeoutException: Topic output-topic not > present in metadata after 60000 ms. > 2023-06-19 19:51:26 DEBUG StreamThread:825 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Processed 1 records with 1 iterations; invoking punctuators if necessary > 2023-06-19 19:51:26 DEBUG StreamThread:837 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 0 > punctuators ran. > 2023-06-19 19:51:26 DEBUG StreamThread:1117 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Committing all active tasks [0_0] and standby tasks [] since 60021ms has > elapsed (commit interval is 30000ms) > 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Flushing record collector > 2023-06-19 19:51:26 WARN StreamThread:626 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Detected the states of tasks [0_0] are corrupted. Will close the task as > dirty and re-create and bootstrap from scratch. > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_0] are > corrupted and hence need to be re-initialized > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:310) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077) > ~[kafka-clients-3.5.0.jar:?] > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) > ~[kafka-clients-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:261) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) > ~[kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > [kafka-streams-3.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > [kafka-streams-3.5.0.jar:?] > 2023-06-19 19:51:26 DEBUG TaskExecutor:176 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > Committing task offsets {} > 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Flushing record collector > 2023-06-19 19:51:26 DEBUG StreamTask:419 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Prepared RUNNING task for committing > 2023-06-19 19:51:26 INFO StreamTask:1235 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Suspended from RUNNING > 2023-06-19 19:51:26 DEBUG StreamTask:898 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Checkpointable offsets {input-topic-0=0} > 2023-06-19 19:51:26 DEBUG ProcessorStateManager:658 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Writing checkpoint: {} for task 0_0 > 2023-06-19 19:51:26 DEBUG StreamTask:504 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Finalized commit for SUSPENDED task with enforce checkpoint true > 2023-06-19 19:51:26 DEBUG ProcessorStateManager:544 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Closing its state manager and all the registered state > stores: {} > 2023-06-19 19:51:26 INFO RecordCollectorImpl:373 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] > stream-task [0_0] Closing record collector dirty > 2023-06-19 19:51:26 INFO StreamTask:555 - stream-thread > [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task > [0_0] Closed dirty > {code} > (3) TimeoutException thrown by the streams producer is replaced with > TaskCorruptedException > [https://github.com/apache/kafka/blob/c97b88d5db4de28d9f51bb11fb71ddd6217c7dda/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L310] -- This message was sent by Atlassian Jira (v8.20.10#820010)