[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751878#comment-17751878
 ] 

Guozhang Wang commented on KAFKA-15259:
---------------------------------------

Got it, KAFKA-15309 makes sense.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
> ------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15259
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15259
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Tomonari Yamashita
>            Priority: Major
>         Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using exactly_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with exactly_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of exactly_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 11000088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 11000088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 11000088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
> due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 11000088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION 
> to error state ABORTABLE_ERROR
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 11000088 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> Processed 1 records with 1 iterations; invoking punctuators if necessary
> 2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 
> punctuators ran.
> 2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> Committing all active tasks [0_0] and standby tasks [] since 273ms has 
> elapsed (commit interval is 100ms)
> 2023-07-26 21:27:19 DEBUG RecordCollectorImpl:345 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Flushing record collector
> 2023-07-26 21:27:19 DEBUG StreamTask:419 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] task 
> [0_0] Prepared RUNNING task for committing
> 2023-07-26 21:27:19 DEBUG TaskExecutor:176 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> Committing task offsets {0_0={input-topic-0=OffsetAndMetadata{offset=629, 
> leaderEpoch=null, metadata='AgAAAYmSGuLv'}}}
> 2023-07-26 21:27:19 ERROR KafkaStreams:537 - stream-client 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Encountered the 
> following exception during processing and the registered exception handler 
> opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 
> org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
> commit a transaction [stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0]]
>       at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:313)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:186)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1678)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1646)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1121)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:840)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>  ~[kafka-streams-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  [kafka-streams-3.5.1.jar:?]
> Caused by: org.apache.kafka.common.KafkaException: Cannot execute 
> transactional method because we are in an error state
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
>  ~[kafka-clients-3.5.1.jar:?]
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:306)
>  ~[kafka-clients-3.5.1.jar:?]
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:757)
>  ~[kafka-clients-3.5.1.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:300)
>  ~[kafka-streams-3.5.1.jar:?]
>       ... 8 more
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The 
> message is 11000088 bytes when serialized which is larger than 1048576, which 
> is the value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  KafkaStreams:340 - stream-client 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] State transition 
> from RUNNING to PENDING_ERROR
> 2023-07-26 21:27:19 INFO  StreamThread:239 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN
> 2023-07-26 21:27:19 INFO  StreamThread:1182 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> Shutting down unclean
> 2023-07-26 21:27:19 INFO  StreamThread:1168 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> Informed to shut down
> 2023-07-26 21:27:19 DEBUG StreamThread:224 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> Ignoring request to transit from PENDING_SHUTDOWN to PENDING_SHUTDOWN: only 
> DEAD state is a valid next state
> 2023-07-26 21:27:19 INFO  KafkaStreams:1367 - stream-client 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutting down 1 
> stream threads
> 2023-07-26 21:27:19 DEBUG KafkaStreams:1374 - stream-client 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutdown 
> java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1 
> complete
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to