[ 
https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18003734#comment-18003734
 ] 

Mihai Lucian commented on KAFKA-19479:
--------------------------------------

*Clarification*

The configuration settings described above are not used in any of our actual 
projects, and the issue demonstrated in this example is not the root cause of 
the original problem we encountered.

Our real concern involves message loss during the scaling of stateful Kafka 
Streams applications. In the process of investigating whether there are 
scenarios in which at_least_once processing might fail to uphold its delivery 
guarantees, we developed this proof of concept to explore and replicate such 
edge cases.

> at_least_once mode in Kafka Streams silently drops messages when the producer 
> fails with MESSAGE_TOO_LARGE, violating delivery guarantees
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19479
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19479
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 4.0.0
>         Environment: Micronaut 4.5.4
> Java 21
> Kotlin 1.9.3
> Kafka clients/streams:
> Apache Kafka 3.7.0, 4.0.0
> Confluent: 7.9.2-ccs, 8.0.0-ccs
> Kafka running in Docker (local test environment)
>            Reporter: Mihai Lucian
>            Priority: Major
>         Attachments: poc-kafka-streams-al-least-once-proj.zip, 
> stream-configs.txt
>
>
> *Description*
> It appears there is a scenario where Kafka Streams running with 
> {{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
> guarantees{*}, resulting in *message loss.*
>  
> *Reproduction Details*
> We run a simple Kafka Streams topology like the following:
> {code:java}
> props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
> props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
> Serdes.String().javaClass.name
> props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
> Serdes.String().javaClass.name
> props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
> // Large producer batch size to induce MESSAGE_TOO_LARGE
> props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
> props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
> /**         
> * a custom ProductionExceptionHandler is registered to demonstrate that it is 
> not triggered in this scenario. 
> * in fact, neither the ProductionExceptionHandler nor the 
> StreamsUncaughtExceptionHandler are invoked during this failure        
> */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
> "poc.MyProductionExceptionHandler"
> val stream = streamsBuilder.stream<String, String>("input.topic")
> stream.peek { key, value -> println("$key:$value") }
>       .to("output.topic")*
>  {code}
>  
> *What we observe:*
>  * Records from {{input.topic}} are consumed and buffered at producer side
>  * After some time (likely based on {{{}commit.interval.ms{}}}), the 
> *consumer offset is committed*
>  * Producer records *flush* is triggered
>  * The sendind of records to kafka broker fails with 
> {{{}MESSAGE_TOO_LARGE{}}}{*}{*}
>  * As a result, the application {*}commits offsets without actually producing 
> the records{*}, which leads to *silent message loss*
>  
> *Steps to Reproduce*
>  # Generate ~50,000 records (sized similarly to the sample project) in 
> {{input.topic to induce MESSAGE_TOO_LARGE}}
>  # Start the topology with the configuration above
>  # Wait for all messages to be consumed
>  # Observe:
>  * 
>  ** Offsets are committed
>  * 
>  ** Output topic receives no messages
>  * 
>  ** Log shows repeated {{MESSAGE_TOO_LARGE}} error:
> {code:java}
> 11:50:30.695 [kafka-producer-network-thread | 
> kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] 
> WARN  o.a.k.c.producer.internals.Sender - [Producer 
> clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer]
>  Got error produce response in correlation id 255 on topic-partition 
> output.topic-0, splitting and retrying (2147483647 attempts left). Error: 
> MESSAGE_TOO_LARGE {code}
>  
> *Reproduced* with :
>  * kafka-client-3.7.0, kafka-streams-3.7.0
>  * kafka-client-4.-.0, kafka-streams-4.0.0
>  * kafka-client-7.9.2-ccs, kafka-streams-7.9.2-ccs
>  * kafka-client-8.0.0-ccs, kafka-streams-8.0.0-ccs
>  
> *Expected Behavior*
> In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless 
> records are {*}successfully produced{*}. 
>  
> *Attached*
>  * configs for stream, producer, consumer
>  * sample project used to replicate the issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to