[ 
https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mihai Lucian updated KAFKA-19479:
---------------------------------
    Description: 
*Description*

It appears there is a scenario where Kafka Streams running with 
{{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
guarantees{*}, resulting in *message loss.*

 

*Reproduction Details*

We run a simple Kafka Streams topology like the following:

 

 
{code:java}
props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
// Large producer batch size to induce MESSAGE_TOO_LARGE
props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
/**         
* a custom ProductionExceptionHandler is registered to demonstrate that it is 
not triggered in this scenario. 
* in fact, neither the ProductionExceptionHandler nor the 
StreamsUncaughtExceptionHandler are invoked during this failure        
*/ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
"poc.MyProductionExceptionHandler"

val stream = streamsBuilder.stream<String, String>("input.topic")
stream.peek { key, value -> println("$key:$value") }
      .to("output.topic")*
 {code}
 

*What we observe:*
 * Records from {{input.topic}} are consumed and buffered at producer side

 * After some time (likely based on {{{}commit.interval.ms{}}}), the *consumer 
offset is committed*
 * Producer records *flush* is triggered

 * The sendind of records to kafka broker fails with 
{{{}MESSAGE_TOO_LARGE{}}}{*}{{*}}

 * As a result, the application {*}commits offsets without actually producing 
the records{*}, which leads to *silent message loss*

 

*Steps to Reproduce*
 # Generate ~50,000 records (sized similarly to the sample project) in 
{{input.topic to induce MESSAGE_TOO_LARGE}}
 # Start the topology with the configuration above
 # Wait for all messages to be consumed
 # Observe:

 * 
 ** Offsets are committed

 * 
 ** Output topic receives no messages

 * 
 ** Log shows repeated {{MESSAGE_TOO_LARGE}} error:

{code:java}
11:50:30.695 [kafka-producer-network-thread | 
kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] 
WARN  o.a.k.c.producer.internals.Sender - [Producer 
clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer]
 Got error produce response in correlation id 255 on topic-partition 
output.topic-0, splitting and retrying (2147483647 attempts left). Error: 
MESSAGE_TOO_LARGE {code}
 

*Expected Behavior*

In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless 
records are {*}successfully produced{*}. 

 

*Attached*
 * configs for stream, producer, consumer
 * sample project used to replicate the issue

  was:
*Description*

It appears there is a scenario where Kafka Streams running with 
{{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
guarantees{*}, resulting in *message loss.*

 

*Reproduction Details*

We run a simple Kafka Streams topology like the following:

 

 
{code:java}
props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
Serdes.String().javaClass.name
props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
// Large producer batch size to induce MESSAGE_TOO_LARGE
props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
/**         
* a custom ProductionExceptionHandler is registered to demonstrate that it is 
not triggered in this scenario. 
* in fact, neither the ProductionExceptionHandler nor the 
StreamsUncaughtExceptionHandler are invoked during this failure        
*/ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
"poc.MyProductionExceptionHandler"

val stream = streamsBuilder.stream<String, String>("input.topic")
stream.peek { key, value -> println("$key:$value") }
      .to("output.topic")*
 {code}
 

*What we observe:*
 * Records from {{input.topic}} are consumed and buffered at producer side

 * After some time (likely based on {{{}commit.interval.ms{}}}), the *consumer 
offset is committed*
 * Producer records *flush* is triggered

 * The sendind of records to kafka broker fails with 
{{{}MESSAGE_TOO_LARGE{}}}{*}{{*}}

 * As a result, the application {*}commits offsets without actually producing 
the records{*}, which leads to *silent message loss*

 

*Steps to Reproduce*
 # Generate ~50,000 records (sized similarly to the sample project) in 
{{input.topic}}

 # Start the topology with the configuration above

 # Wait for all messages to be consumed (but not yet flushed to the output 
topic)

 # Observe:

 * 
 ** Offsets are committed

 * 
 ** Output topic receives no messages

 * 
 ** Log shows repeated {{MESSAGE_TOO_LARGE}} error:

{code:java}
11:50:30.695 [kafka-producer-network-thread | 
kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] 
WARN  o.a.k.c.producer.internals.Sender - [Producer 
clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer]
 Got error produce response in correlation id 255 on topic-partition 
output.topic-0, splitting and retrying (2147483647 attempts left). Error: 
MESSAGE_TOO_LARGE {code}
 

*Expected Behavior*

In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless 
records are {*}successfully produced{*}. 

 

*Attached*
 * configs for stream, producer, consumer
 * sample project used to replicate the issue


> at_least_once mode in Kafka Streams silently drops messages when the producer 
> fails with MESSAGE_TOO_LARGE, violating delivery guarantees
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19479
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19479
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 4.0.0
>         Environment: Micronaut 4.5.4
> Java 21
> Kotlin 1.9.3
> Kafka clients/streams:
> Apache Kafka 3.7.0, 4.0.0
> Confluent: 7.9.2-ccs, 8.0.0-ccs
> Kafka running in Docker (local test environment)
>            Reporter: Mihai Lucian
>            Assignee: Mihai Lucian
>            Priority: Major
>         Attachments: poc-kafka-streams-al-least-once-proj.zip, 
> stream-configs.txt
>
>
> *Description*
> It appears there is a scenario where Kafka Streams running with 
> {{processing.guarantee=at_least_once}} does {*}not uphold its delivery 
> guarantees{*}, resulting in *message loss.*
>  
> *Reproduction Details*
> We run a simple Kafka Streams topology like the following:
>  
>  
> {code:java}
> props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
> props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = 
> Serdes.String().javaClass.name
> props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = 
> Serdes.String().javaClass.name
> props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
> // Large producer batch size to induce MESSAGE_TOO_LARGE
> props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
> props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
> /**         
> * a custom ProductionExceptionHandler is registered to demonstrate that it is 
> not triggered in this scenario. 
> * in fact, neither the ProductionExceptionHandler nor the 
> StreamsUncaughtExceptionHandler are invoked during this failure        
> */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = 
> "poc.MyProductionExceptionHandler"
> val stream = streamsBuilder.stream<String, String>("input.topic")
> stream.peek { key, value -> println("$key:$value") }
>       .to("output.topic")*
>  {code}
>  
> *What we observe:*
>  * Records from {{input.topic}} are consumed and buffered at producer side
>  * After some time (likely based on {{{}commit.interval.ms{}}}), the 
> *consumer offset is committed*
>  * Producer records *flush* is triggered
>  * The sendind of records to kafka broker fails with 
> {{{}MESSAGE_TOO_LARGE{}}}{*}{{*}}
>  * As a result, the application {*}commits offsets without actually producing 
> the records{*}, which leads to *silent message loss*
>  
> *Steps to Reproduce*
>  # Generate ~50,000 records (sized similarly to the sample project) in 
> {{input.topic to induce MESSAGE_TOO_LARGE}}
>  # Start the topology with the configuration above
>  # Wait for all messages to be consumed
>  # Observe:
>  * 
>  ** Offsets are committed
>  * 
>  ** Output topic receives no messages
>  * 
>  ** Log shows repeated {{MESSAGE_TOO_LARGE}} error:
> {code:java}
> 11:50:30.695 [kafka-producer-network-thread | 
> kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] 
> WARN  o.a.k.c.producer.internals.Sender - [Producer 
> clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer]
>  Got error produce response in correlation id 255 on topic-partition 
> output.topic-0, splitting and retrying (2147483647 attempts left). Error: 
> MESSAGE_TOO_LARGE {code}
>  
> *Expected Behavior*
> In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless 
> records are {*}successfully produced{*}. 
>  
> *Attached*
>  * configs for stream, producer, consumer
>  * sample project used to replicate the issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to