Tomonari Yamashita created KAFKA-15259:
------------------------------------------
Summary: Kafka Streams does not continue processing due to
rollback despite ProductionExceptionHandlerResponse.CONTINUE if using
execute_once
Key: KAFKA-15259
URL: https://issues.apache.org/jira/browse/KAFKA-15259
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.5.1
Reporter: Tomonari Yamashita
Attachments: Reproducer.java, app_at_least_once.log,
app_exactly_once.log
[Problem]
- Kafka Streams does not continue processing due to rollback despite
ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
-- "CONTINUE will signal that Streams should ignore the issue and continue
processing"(1), so Kafka Streams should continue processing even if using
execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
-- However, if using execute_once, Kafka Streams does not continue processing
due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the
client will be shut down as the default
behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT)
[Environment]
- Kafka Streams 3.5.1
[Reproduction procedure]
# Create "input-topic" topic and "output-topic"
# Put several messages on "input-topic"
# Execute a simple Kafka streams program that transfers too large messages
from "input-topic" to "output-topic" with execute_once and returns
ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the
producer. Please refer to the reproducer program (attached file:
Reproducer.java).
# ==> However, Kafka Streams does not continue processing due to rollback
despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread
shutdown as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT)
(2). Please refer to the debug log (attached file: app_exactly_once.log).
## My excepted behavior is that Kafka Streams should continue processing even
if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE used.
[As far as my investigation]
- FYI, if using at_least_once instead of execute_once, Kafka Streams continue
processing without rollback when ProductionExceptionHandlerResponse.CONTINUE is
used. Please refer to the debug log (attached file: app_at_least_once.log).
(1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
-
[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
(2) Transaction abort and shutdown occur
{code:java}
2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
transactionalId=java-kafka-streams-0_0] Exception occurred during message send:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088
bytes when serialized which is larger than 1048576, which is the value of the
max.request.size configuration.
2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
stream-task [0_0] Error encountered sending record to topic output-topic for
task 0_0 due to:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088
bytes when serialized which is larger than 1048576, which is the value of the
max.request.size configuration.
Exception handler choose to CONTINUE processing in spite of this error but
written offsets would not be recorded.
org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088
bytes when serialized which is larger than 1048576, which is the value of the
max.request.size configuration.
2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
transactionalId=java-kafka-streams-0_0] Transiting to abortable error state
due to org.apache.kafka.common.errors.RecordTooLargeException: The message is
11000088 bytes when serialized which is larger than 1048576, which is the value
of the max.request.size configuration.
2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION
to error state ABORTABLE_ERROR
org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088
bytes when serialized which is larger than 1048576, which is the value of the
max.request.size configuration.
2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
Processed 1 records with 1 iterations; invoking punctuators if necessary
2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0
punctuators ran.
2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
Committing all active tasks [0_0] and standby tasks [] since 273ms has elapsed
(commit interval is 100ms)
2023-07-26 21:27:19 DEBUG RecordCollectorImpl:345 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
stream-task [0_0] Flushing record collector
2023-07-26 21:27:19 DEBUG StreamTask:419 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] task
[0_0] Prepared RUNNING task for committing
2023-07-26 21:27:19 DEBUG TaskExecutor:176 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
Committing task offsets {0_0={input-topic-0=OffsetAndMetadata{offset=629,
leaderEpoch=null, metadata='AgAAAYmSGuLv'}}}
2023-07-26 21:27:19 ERROR KafkaStreams:537 - stream-client
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Encountered the
following exception during processing and the registered exception handler
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to
commit a transaction [stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
stream-task [0_0]]
at
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:313)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:186)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1678)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1646)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1121)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:840)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
~[kafka-streams-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
[kafka-streams-3.5.1.jar:?]
Caused by: org.apache.kafka.common.KafkaException: Cannot execute transactional
method because we are in an error state
at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
~[kafka-clients-3.5.1.jar:?]
at
org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:306)
~[kafka-clients-3.5.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:757)
~[kafka-clients-3.5.1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:300)
~[kafka-streams-3.5.1.jar:?]
... 8 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message
is 11000088 bytes when serialized which is larger than 1048576, which is the
value of the max.request.size configuration.
2023-07-26 21:27:19 INFO KafkaStreams:340 - stream-client
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] State transition from
RUNNING to PENDING_ERROR
2023-07-26 21:27:19 INFO StreamThread:239 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] State
transition from RUNNING to PENDING_SHUTDOWN
2023-07-26 21:27:19 INFO StreamThread:1182 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
Shutting down unclean
2023-07-26 21:27:19 INFO StreamThread:1168 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
Informed to shut down
2023-07-26 21:27:19 DEBUG StreamThread:224 - stream-thread
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1]
Ignoring request to transit from PENDING_SHUTDOWN to PENDING_SHUTDOWN: only
DEAD state is a valid next state
2023-07-26 21:27:19 INFO KafkaStreams:1367 - stream-client
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutting down 1
stream threads
2023-07-26 21:27:19 DEBUG KafkaStreams:1374 - stream-client
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutdown
java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1 complete
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)