[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-20 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Description: 
[Problem]
 - task.timeout.ms does not work when TimeoutException is thrown by streams 
producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.

[Environment]
 - Kafka Streams 3.5.0

[Reproduction procedure]
 # Create "input-topic" topic
 # Put several messages on "input-topic"
 # DONT create "output-topic" topic, to fire TimeoutException
 # Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
 -- 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}
 # Wait for task.timeout.ms (default is 5 minutes).
 ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged because "output-topic" does not exist.
 ## And every one minute, TimeoutException will be generated (2)
 # ==> However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.
 ## My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is 
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
 - TimeoutException thrown by the streams producer is replaced with 
TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
 - And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
 - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
 - 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
{code:java}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 6 ms.

[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-19 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Description: 
[Problem]
 - task.timeout.ms does not work when TimeoutException is thrown by streams 
producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.

[Environment]
 - Kafka Streams 3.5.0

[Reproduce procedure]
 # Create "input-topic" topic
 # Put several messages on "input-topic"
 # DONT create "output-topic" topic, to fire TimeoutException
 # Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
 -- 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}

 # Wait for task.timeout.ms (default is 5 minutes).
 ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged because "output-topic" does not exist.
 ## And every one minute, TimeoutException will be generated (2)
 # ==> However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.
 ## My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is 
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
 - TimeoutException thrown by the streams producer is replaced with 
TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
 - And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
- [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
- 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
{code:java}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 6 ms.

[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-19 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Description: 
[Problem]
 - task.timeout.ms does not work when TimeoutException is thrown by streams 
producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.

[Environment]
 - Kafka Streams 3.5.0

[Reproduce procedure]
 # Create "input-topic" topic
 # Put several messages on "input-topic"
 # DONT create "output-topic" topic, to fire TimeoutException
 # Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
 -- 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}

 # Wait for task.timeout.ms (default is 5 minutes).
 ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged because "output-topic" does not exist.
 ## And every one minute, TimeoutException will be generated (2)
 # ==> However, it doesn't look like task.timeout.ms is working for the stream 
producer, then it seems to keep retrying forever.
 ## My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is 
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
 - TimeoutException thrown by the stream producer is replaced with 
TaskCorruptedException (3)
 - And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
- [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
- 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
{code:java}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 6 ms.
2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - 

[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-19 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Description: 
[Problem]
 - task.timeout.ms does not work when TimeoutException is thrown by streams 
producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the stream 
producer, then it seems to keep retrying forever.

[Environment]
 - Kafka Streams 3.5.0

[Reproduce procedure]
 # Create "input-topic" topic
 # Put several messages on "input-topic"
 # DONT create "output-topic" topic, to fire TimeoutException
 # Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
 -- 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}

 # Wait for task.timeout.ms (default is 5 minutes).
 ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged because "output-topic" does not exist.
 ## And every one minute, TimeoutException will be generated (2)
 # ==> However, it doesn't look like task.timeout.ms is working for the stream 
producer, then it seems to keep retrying forever.
 ## My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is 
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
 - TimeoutException thrown by the stream producer is replaced with 
TaskCorruptedException (3)
 - And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
- [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
- 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
{code:java}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 6 ms.
2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 - 

[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-19 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Description: 
[Problem]
 - task.timeout.ms does not work when TimeoutException is thrown by streams 
producer
 -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
TimeoutException thrown by the consumer, producer, and admin client."(1) and 
"To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
(default is 5 minutes)."(1).
 -- However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.

[Environment]
 - Kafka Streams 3.5.0

[Reproduce procedure]
 # Create "input-topic" topic
 # Put several messages on "input-topic"
 # DONT create "output-topic" topic, to fire TimeoutException
 # Create the following simple Kafka streams program; this program just 
transfers messages from "input-topic" to "output-topic".
 -- 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
 // not needed

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
{code}

 # Wait for task.timeout.ms (default is 5 minutes).
 ## If the debug log is enabled, a large number of UNKNOWN_TOPIC_OR_PARTITIONs 
will be logged because "output-topic" does not exist.
 ## And every one minute, TimeoutException will be generated (2)
 # ==> However, it doesn't look like task.timeout.ms is working for the streams 
producer, then it seems to keep retrying forever.
 ## My excepted behavior is that task.timeout.ms is working, and the client 
will be shutdown because the default behavior is 
StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.

[As far as my investigation]
 - TimeoutException thrown by the streams producer is replaced with 
TaskCorruptedException (3)
 - And after that it does not appear to be executing code that contains logic 
related to task.timeout.ms.

(1) Kafka Streams upgrade guide
- [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
- 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
{code:java}
Kafka Streams is now handling TimeoutException thrown by the consumer, 
producer, and admin client. If a timeout occurs on a task, Kafka Streams moves 
to the next task and retries to make progress on the failed task in the next 
iteration. To bound how long Kafka Streams retries a task, you can set 
task.timeout.ms (default is 5 minutes). If a task does not make progress within 
the specified task timeout, which is tracked on a per-task basis, Kafka Streams 
throws a TimeoutException (cf. KIP-572).
{code}

(2) TimeoutException occurs
{code:java}
2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Error while fetching metadata with correlation id 1065 : 
{output-topic=UNKNOWN_TOPIC_OR_PARTITION}
2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Requesting metadata update for topic output-topic due to error 
UNKNOWN_TOPIC_OR_PARTITION
2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Updated cluster metadata updateVersion 1064 to 
MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
nodes={0=a86b6e81dded542bb867337e34fa7954-1776321381.ap-northeast-1.elb.amazonaws.com:9094
 (id: 0 rack: null), 
1=a99402a2de0054c2a96e87075df0f545-254291543.ap-northeast-1.elb.amazonaws.com:9094
 (id: 1 rack: null), 
2=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}, partitions=[], 
controller=a82f92d2e86d145b48447de89694d879-1900034172.ap-northeast-1.elb.amazonaws.com:9094
 (id: 2 rack: null)}
2023-06-19 19:51:26 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
 Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic output-topic not present 
in metadata after 6 ms.
2023-06-19 19:51:26 ERROR RecordCollectorImpl:322 

[jira] [Updated] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-19 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15108:
---
Summary: task.timeout.ms does not work when TimeoutException is thrown by 
streams producer  (was: task.timeout.ms does not work when TimeoutException is 
thrown by stream producer)

> task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
> -
>
> Key: KAFKA-15108
> URL: https://issues.apache.org/jira/browse/KAFKA-15108
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> [Problem]
>  - task.timeout.ms does not work when TimeoutException is thrown by stream 
> producer
>  -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
> TimeoutException thrown by the consumer, producer, and admin client."(1) and 
> "To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
> (default is 5 minutes)."(1).
>  -- However, it doesn't look like task.timeout.ms is working for the stream 
> producer, then it seems to keep retrying forever.
> [Environment]
>  - Kafka Streams 3.5.0
> [Reproduce procedure]
>  # Create "input-topic" topic
>  # Put several messages on "input-topic"
>  # DONT create "output-topic" topic, to fire TimeoutException
>  # Create the following simple Kafka streams program; this program just 
> transfers messages from "input-topic" to "output-topic".
>  -- 
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
>  // not needed
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> {code}
>  # Wait for task.timeout.ms (default is 5 minutes).
>  ## If the debug log is enabled, a large number of 
> UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not 
> exist.
>  ## And every one minute, TimeoutException will be generated (2)
>  # ==> However, it doesn't look like task.timeout.ms is working for the 
> stream producer, then it seems to keep retrying forever.
>  ## My excepted behavior is that task.timeout.ms is working, and the client 
> will be shutdown because the default behavior is 
> StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.
> [As far as my investigation]
>  - TimeoutException thrown by the stream producer is replaced with 
> TaskCorruptedException (3)
>  - And after that it does not appear to be executing code that contains logic 
> related to task.timeout.ms.
> (1) Kafka Streams upgrade guide
> - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
> - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
> {code:java}
> Kafka Streams is now handling TimeoutException thrown by the consumer, 
> producer, and admin client. If a timeout occurs on a task, Kafka Streams 
> moves to the next task and retries to make progress on the failed task in the 
> next iteration. To bound how long Kafka Streams retries a task, you can set 
> task.timeout.ms (default is 5 minutes). If a task does not make progress 
> within the specified task timeout, which is tracked on a per-task basis, 
> Kafka Streams throws a TimeoutException (cf. KIP-572).
> {code}
> (2) TimeoutException occurs
> {code:java}
> 2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Error while fetching metadata with correlation id 1065 : 
> {output-topic=UNKNOWN_TOPIC_OR_PARTITION}
> 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Requesting metadata update for topic output-topic due to error 
> UNKNOWN_TOPIC_OR_PARTITION
> 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Updated cluster metadata updateVersion 1064 to 
> MetadataCache{clusterId='ulBlb0C3QdaurHgFmPLYew', 
>