[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751878#comment-17751878 ]
Guozhang Wang commented on KAFKA-15259: --------------------------------------- Got it, KAFKA-15309 makes sense. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once > ------------------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Tomonari Yamashita > Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > exactly_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using exactly_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with exactly_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of exactly_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 11000088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 11000088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 11000088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transiting to abortable error state > due to org.apache.kafka.common.errors.RecordTooLargeException: The message is > 11000088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION > to error state ABORTABLE_ERROR > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 11000088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > Processed 1 records with 1 iterations; invoking punctuators if necessary > 2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 > punctuators ran. > 2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > Committing all active tasks [0_0] and standby tasks [] since 273ms has > elapsed (commit interval is 100ms) > 2023-07-26 21:27:19 DEBUG RecordCollectorImpl:345 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Flushing record collector > 2023-07-26 21:27:19 DEBUG StreamTask:419 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] task > [0_0] Prepared RUNNING task for committing > 2023-07-26 21:27:19 DEBUG TaskExecutor:176 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > Committing task offsets {0_0={input-topic-0=OffsetAndMetadata{offset=629, > leaderEpoch=null, metadata='AgAAAYmSGuLv'}}} > 2023-07-26 21:27:19 ERROR KafkaStreams:537 - stream-client > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Encountered the > following exception during processing and the registered exception handler > opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. > org.apache.kafka.streams.errors.StreamsException: Error encountered trying to > commit a transaction [stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0]] > at > org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:313) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:186) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1678) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1646) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1121) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:840) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > ~[kafka-streams-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > [kafka-streams-3.5.1.jar:?] > Caused by: org.apache.kafka.common.KafkaException: Cannot execute > transactional method because we are in an error state > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010) > ~[kafka-clients-3.5.1.jar:?] > at > org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:306) > ~[kafka-clients-3.5.1.jar:?] > at > org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:757) > ~[kafka-clients-3.5.1.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:300) > ~[kafka-streams-3.5.1.jar:?] > ... 8 more > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > message is 11000088 bytes when serialized which is larger than 1048576, which > is the value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO KafkaStreams:340 - stream-client > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] State transition > from RUNNING to PENDING_ERROR > 2023-07-26 21:27:19 INFO StreamThread:239 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > State transition from RUNNING to PENDING_SHUTDOWN > 2023-07-26 21:27:19 INFO StreamThread:1182 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > Shutting down unclean > 2023-07-26 21:27:19 INFO StreamThread:1168 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > Informed to shut down > 2023-07-26 21:27:19 DEBUG StreamThread:224 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > Ignoring request to transit from PENDING_SHUTDOWN to PENDING_SHUTDOWN: only > DEAD state is a valid next state > 2023-07-26 21:27:19 INFO KafkaStreams:1367 - stream-client > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutting down 1 > stream threads > 2023-07-26 21:27:19 DEBUG KafkaStreams:1374 - stream-client > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutdown > java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1 > complete > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)