[ 
https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mihai Lucian updated KAFKA-19479:
---------------------------------
    Description: 
*Description*

It appears there is a scenario where Kafka Streams running with 
{{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
guarantees{*}, resulting in *message loss.*

 

*Reproduction Details*

We run a simple Kafka Streams topology like the following:

 

 
{code:java}

{code}
*props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
// Large producer batch size to induce MESSAGE_TOO_LARGE
props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
// Custom handler registered (never triggered)
props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
"poc.MyProductionExceptionHandler"
val stream = streamsBuilder.stream<String, String>("input.topic")
stream.peek \{ key, value -> println("$key:$value") }
      .to("output.topic")* 

 

 

  was:
*Description*
It appears there is a scenario where Kafka Streams running with 
{{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
guarantees{*}, resulting in *message loss.*

*Reproduction Details*

*We run a simple Kafka Streams topology like the following:*

*kotlin```*

props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE

// Large producer batch size to induce MESSAGE_TOO_LARGE
props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"

// Custom handler registered (never triggered)
props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
"poc.MyProductionExceptionHandler"

val stream = streamsBuilder.stream<String, String>("input.topic")
stream.peek \{ key, value -> println("$key:$value") }
      .to("output.topic")

```


> at_least_once mode in Kafka Streams silently drops messages when the producer 
> fails with MESSAGE_TOO_LARGE, violating delivery guarantees.
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19479
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19479
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 4.0.0
>            Reporter: Mihai Lucian
>            Assignee: Mihai Lucian
>            Priority: Major
>
> *Description*
> It appears there is a scenario where Kafka Streams running with 
> {{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
> guarantees{*}, resulting in *message loss.*
>  
> *Reproduction Details*
> We run a simple Kafka Streams topology like the following:
>  
>  
> {code:java}
> {code}
> *props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
> props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
> Serdes.String().javaClass.name
> props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
> Serdes.String().javaClass.name
> props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
> // Large producer batch size to induce MESSAGE_TOO_LARGE
> props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
> props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
> // Custom handler registered (never triggered)
> props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
> "poc.MyProductionExceptionHandler"
> val stream = streamsBuilder.stream<String, String>("input.topic")
> stream.peek \{ key, value -> println("$key:$value") }
>       .to("output.topic")* 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to