Tomonari Yamashita created KAFKA-15108:
------------------------------------------

             Summary: task.timeout.ms doesn't work for the stream producer
                 Key: KAFKA-15108
                 URL: https://issues.apache.org/jira/browse/KAFKA-15108
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.5.0
            Reporter: Tomonari Yamashita


[Problem]
- task.timeout.ms doesn't work for the stream producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the stream 
producer, then it seems to keep retrying forever.

[Environment]
- Kafka Streams 3.5.0

[Reproduce procedure]
# Create "input-topic" topic
# Put several messages on "input-topic"
# DONT create "output-topic" topic, to fire TimeException 
# Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
-- {code}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
        .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}
# Wait for task.timeout.ms (default is 5 minutes).
## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged.
## And every one minute, TimeException will be generated (2)
# ==> However, it doesn't look like task.timeout.ms is working for the stream 
producer, then it seems to keep retrying forever.
# ==> My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is  
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
- TimeException thrown by the stream producer is replaced with 
TaskCorruptedException (3)
- And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
    https://kafka.apache.org/35/documentation/streams/upgrade-guide
{code}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 60000 ms.
2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
stream-task [0_0] Error encountered sending record to topic output-topic for 
task 0_0 due to:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 60000 ms.
The broker is either slow or in bad state (like not having enough replicas) in 
responding the request, or the connection to broker was interrupted sending the 
request or receiving the response. 
Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger 
value to wait longer for such scenarios and avoid timeout errors
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 60000 ms.
2023-06-19 19:51:26 DEBUG StreamThread:825 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
Processed 1 records with 1 iterations; invoking punctuators if necessary
2023-06-19 19:51:26 DEBUG StreamThread:837 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 0 
punctuators ran.
2023-06-19 19:51:26 DEBUG StreamThread:1117 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
Committing all active tasks [0_0] and standby tasks [] since 60021ms has 
elapsed (commit interval is 30000ms)
2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
stream-task [0_0] Flushing record collector
2023-06-19 19:51:26 WARN  StreamThread:626 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
Detected the states of tasks [0_0] are corrupted. Will close the task as dirty 
and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_0] are 
corrupted and hence need to be re-initialized
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:310)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:284)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077) 
~[kafka-clients-3.5.0.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) 
~[kafka-clients-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:261)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) 
~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
 ~[kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
 [kafka-streams-3.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 [kafka-streams-3.5.0.jar:?]
2023-06-19 19:51:26 DEBUG TaskExecutor:176 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
Committing task offsets {}
2023-06-19 19:51:26 DEBUG RecordCollectorImpl:345 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
stream-task [0_0] Flushing record collector
2023-06-19 19:51:26 DEBUG StreamTask:419 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
[0_0] Prepared RUNNING task for committing
2023-06-19 19:51:26 INFO  StreamTask:1235 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
[0_0] Suspended from RUNNING
2023-06-19 19:51:26 DEBUG StreamTask:898 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
[0_0] Checkpointable offsets {input-topic-0=0}
2023-06-19 19:51:26 DEBUG ProcessorStateManager:658 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
stream-task [0_0] Writing checkpoint: {} for task 0_0
2023-06-19 19:51:26 DEBUG StreamTask:504 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
[0_0] Finalized commit for SUSPENDED task with enforce checkpoint true
2023-06-19 19:51:26 DEBUG ProcessorStateManager:544 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
stream-task [0_0] Closing its state manager and all the registered state 
stores: {}
2023-06-19 19:51:26 INFO  RecordCollectorImpl:373 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] 
stream-task [0_0] Closing record collector dirty
2023-06-19 19:51:26 INFO  StreamTask:555 - stream-thread 
[java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1] task 
[0_0] Closed dirty
{code}

(3) TimeException thrown by the stream producer is replaced with 
TaskCorruptedException
   
https://github.com/apache/kafka/blob/c97b88d5db4de28d9f51bb11fb71ddd6217c7dda/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L310



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to