[ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735476#comment-17735476
 ] 

Matthias J. Sax commented on KAFKA-15108:
-----------------------------------------

There are a few cases for which we cannot handle a `TimeoutException` more 
gracefully, and the docs gloss over this fact. – The scenario you describe is 
one of these cases.

I agree that we should maybe try to include it – the challenge (and why it was 
not included in the original work) is, that it will need different handling 
compared how we handle `TimeoutException` for the regular case...

> task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-15108
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15108
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.0
>            Reporter: Tomonari Yamashita
>            Priority: Major
>
> [Problem]
>  - task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
>  -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
> TimeoutException thrown by the consumer, producer, and admin client."(1) and 
> "To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
> (default is 5 minutes)."(1).
>  -- However, it doesn't look like task.timeout.ms is working for the streams 
> producer, then it seems to keep retrying forever.
> [Environment]
>  - Kafka Streams 3.5.0
> [Reproduce procedure]
>  # Create "input-topic" topic
>  # Put several messages on "input-topic"
>  # DONT create "output-topic" topic, to fire TimeoutException
>  # Create the following simple Kafka streams program; this program just 
> transfers messages from "input-topic" to "output-topic".
>  -- 
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
>  // not needed
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>         .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> {code}
>  # Wait for task.timeout.ms (default is 5 minutes).
>  ## If the debug log is enabled, a large number of 
> UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not 
> exist.
>  ## And every one minute, TimeoutException will be generated (2)
>  # ==> However, it doesn't look like task.timeout.ms is working for the 
> streams producer, then it seems to keep retrying forever.
>  ## My excepted behavior is that task.timeout.ms is working, and the client 
> will be shutdown because the default behavior is 
> StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.
> [As far as my investigation]
>  - TimeoutException thrown by the streams producer is replaced with 
> TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
>  - And after that it does not appear to be executing code that contains logic 
> related to task.timeout.ms.
> (1) Kafka Streams upgrade guide
> - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
> - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
> {code:java}
> Kafka Streams is now handling TimeoutException thrown by the consumer, 
> producer, and admin client. If a timeout occurs on a task, Kafka Streams 
> moves to the next task and retries to make progress on the failed task in the 
> next iteration. To bound how long Kafka Streams retries a task, you can set 
> task.timeout.ms (default is 5 minutes). If a task does not make progress 
> within the specified task timeout, which is tracked on a per-task basis, 
> Kafka Streams throws a TimeoutException (cf. KIP-572).
> {code}
> (2) TimeoutException occurs
> {code:java}
> 2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Error while fetching metadata with correlation id 1065 : 
> {output-topic=UNKNOWN_TOPIC_OR_PARTITION}
> 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Requesting metadata update for topic output-topic due to error 
> UNKNOWN_TOPIC_OR_PARTITION
> 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Updated cluster metadata updateVersion 1064 to 
> MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
> nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
>  (id: 0 rack: null), 
> 1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
>  (id: 1 rack: null), 
> 2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
>  (id: 2 rack: null)}, partitions=[], 
> controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
>  (id: 2 rack: null)}
> 2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Exception occurred during message send:
> org.apache.kafka.common.errors.TimeoutException: Topic output-topic not 
> present in metadata after 60000 ms.
> 2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.TimeoutException: Topic output-topic not 
> present in metadata after 60000 ms.
> The broker is either slow or in bad state (like not having enough replicas) 
> in responding the request, or the connection to broker was interrupted 
> sending the request or receiving the response. 
> Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger 
> value to wait longer for such scenarios and avoid timeout errors
> org.apache.kafka.common.errors.TimeoutException: Topic output-topic not 
> present in metadata after 60000 ms.
> 2023-06-19 19:51:26 DEBUG StreamThread:825 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> Processed 1 records with 1 iterations; invoking punctuators if necessary
> 2023-06-19 19:51:26 DEBUG StreamThread:837 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 0 
> punctuators ran.
> 2023-06-19 19:51:26 DEBUG StreamThread:1117 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> Committing all active tasks [0_0] and standby tasks [] since 60021ms has 
> elapsed (commit interval is 30000ms)
> 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> stream-task [0_0] Flushing record collector
> 2023-06-19 19:51:26 WARN  StreamThread:626 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> Detected the states of tasks [0_0] are corrupted. Will close the task as 
> dirty and re-create and bootstrap from scratch.
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_0] are 
> corrupted and hence need to be re-initialized
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:310)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077)
>  ~[kafka-clients-3.5.0.jar:?]
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) 
> ~[kafka-clients-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:261)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
>  ~[kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>  [kafka-streams-3.5.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  [kafka-streams-3.5.0.jar:?]
> 2023-06-19 19:51:26 DEBUG TaskExecutor:176 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> Committing task offsets {}
> 2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> stream-task [0_0] Flushing record collector
> 2023-06-19 19:51:26 DEBUG StreamTask:419 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
> [0_0] Prepared RUNNING task for committing
> 2023-06-19 19:51:26 INFO  StreamTask:1235 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
> [0_0] Suspended from RUNNING
> 2023-06-19 19:51:26 DEBUG StreamTask:898 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
> [0_0] Checkpointable offsets {input-topic-0=0}
> 2023-06-19 19:51:26 DEBUG ProcessorStateManager:658 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> stream-task [0_0] Writing checkpoint: {} for task 0_0
> 2023-06-19 19:51:26 DEBUG StreamTask:504 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
> [0_0] Finalized commit for SUSPENDED task with enforce checkpoint true
> 2023-06-19 19:51:26 DEBUG ProcessorStateManager:544 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> stream-task [0_0] Closing its state manager and all the registered state 
> stores: {}
> 2023-06-19 19:51:26 INFO  RecordCollectorImpl:373 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
> stream-task [0_0] Closing record collector dirty
> 2023-06-19 19:51:26 INFO  StreamTask:555 - stream-thread 
> [java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
> [0_0] Closed dirty
> {code}
> (3) TimeoutException thrown by the streams producer is replaced with 
> TaskCorruptedException
>     
> [https://github.com/apache/kafka/blob/c97b88d5db4de28d9f51bb11fb71ddd6217c7dda/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L310]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to