[ https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mihai Lucian updated KAFKA-19479: --------------------------------- Description: *Description* It appears there is a scenario where Kafka Streams running with {{processing.guarantee=at_least_once}} does {*}not uphold its delivery guarantees{*}, resulting in *message loss.* *Reproduction Details* We run a simple Kafka Streams topology like the following: {code:java} props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE // Large producer batch size to induce MESSAGE_TOO_LARGE props[ProducerConfig.LINGER_MS_CONFIG] = "300000" props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" /** * a custom ProductionExceptionHandler is registered to demonstrate that it is not triggered in this scenario. * in fact, neither the ProductionExceptionHandler nor the StreamsUncaughtExceptionHandler are invoked during this failure */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = "poc.MyProductionExceptionHandler" val stream = streamsBuilder.stream<String, String>("input.topic") stream.peek { key, value -> println("$key:$value") } .to("output.topic")* {code} *What we observe:* * Records from {{input.topic}} are consumed and buffered at producer side * After some time (likely based on {{{}commit.interval.ms{}}}), the *consumer offset is committed* * The sendind of records to kafka broker fails with {{{}MESSAGE_TOO_LARGE{}}}{*}{{*}} * As a result, the application {*}commits offsets without actually producing the records{*}, which leads to *silent message loss* *Steps to Reproduce* # Generate ~50,000 records (sized similarly to the sample project) in {{input.topic}} # Start the topology with the configuration above # Wait for all messages to be consumed (but not yet flushed to the output topic) # Observe: * ** Offsets are committed * ** Output topic receives no messages * ** Log shows repeated {{MESSAGE_TOO_LARGE}} error: {code:java} 11:50:30.695 [kafka-producer-network-thread | kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] Got error produce response in correlation id 255 on topic-partition output.topic-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE {code} *Expected Behavior* In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless records are {*}successfully produced{*}. was: *Description* It appears there is a scenario where Kafka Streams running with {{processing.guarantee=at_least_once}} does {*}not uphold its delivery guarantees{*}, resulting in *message loss.* *Reproduction Details* We run a simple Kafka Streams topology like the following: {code:java} props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE // Large producer batch size to induce MESSAGE_TOO_LARGE props[ProducerConfig.LINGER_MS_CONFIG] = "300000" props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" /** * a custom ProductionExceptionHandler is registered to demonstrate that it is not triggered in this scenario. * in fact, neither the ProductionExceptionHandler nor the StreamsUncaughtExceptionHandler are invoked during this failure */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = "poc.MyProductionExceptionHandler" val stream = streamsBuilder.stream<String, String>("input.topic") stream.peek { key, value -> println("$key:$value") } .to("output.topic")* {code} *What we observe:* * Records from {{input.topic}} are consumed and buffered at producer side * After some time (likely based on {{{}commit.interval.ms{}}}), the *consumer offset is committed* * The sendind of records to kafka broker fails with {{{}MESSAGE_TOO_LARGE{}}}{*}{*} * As a result, the application {*}commits offsets without actually producing the records{*}, which leads to *silent message loss* *Steps to Reproduce* # Generate ~50,000 records (sized similarly to the sample project) in {{input.topic}} # Start the topology with the configuration above # Wait for all messages to be consumed (but not yet flushed to the output topic) # Observe: ** Offsets are committed ** Output topic receives no messages ** Log shows repeated {{MESSAGE_TOO_LARGE}} error: {code:java} 11:50:30.695 [kafka-producer-network-thread | kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] Got error produce response in correlation id 255 on topic-partition output.topic-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE {code} Environment: Micronaut 4.5.4 Java 21 Kotlin 1.9.3 Kafka clients/streams: Apache Kafka 3.7.0, 4.0.0 Confluent: 7.9.2-ccs, 8.0.0-ccs Kafka running in Docker (local test environment) > at_least_once mode in Kafka Streams silently drops messages when the producer > fails with MESSAGE_TOO_LARGE, violating delivery guarantees > ----------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-19479 > URL: https://issues.apache.org/jira/browse/KAFKA-19479 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 4.0.0 > Environment: Micronaut 4.5.4 > Java 21 > Kotlin 1.9.3 > Kafka clients/streams: > Apache Kafka 3.7.0, 4.0.0 > Confluent: 7.9.2-ccs, 8.0.0-ccs > Kafka running in Docker (local test environment) > Reporter: Mihai Lucian > Assignee: Mihai Lucian > Priority: Major > > *Description* > It appears there is a scenario where Kafka Streams running with > {{processing.guarantee=at_least_once}} does {*}not uphold its delivery > guarantees{*}, resulting in *message loss.* > > *Reproduction Details* > We run a simple Kafka Streams topology like the following: > > > {code:java} > props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" > props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE > // Large producer batch size to induce MESSAGE_TOO_LARGE > props[ProducerConfig.LINGER_MS_CONFIG] = "300000" > props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" > /** > * a custom ProductionExceptionHandler is registered to demonstrate that it is > not triggered in this scenario. > * in fact, neither the ProductionExceptionHandler nor the > StreamsUncaughtExceptionHandler are invoked during this failure > */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = > "poc.MyProductionExceptionHandler" > val stream = streamsBuilder.stream<String, String>("input.topic") > stream.peek { key, value -> println("$key:$value") } > .to("output.topic")* > {code} > > *What we observe:* > * Records from {{input.topic}} are consumed and buffered at producer side > * After some time (likely based on {{{}commit.interval.ms{}}}), the > *consumer offset is committed* > * The sendind of records to kafka broker fails with > {{{}MESSAGE_TOO_LARGE{}}}{*}{{*}} > * As a result, the application {*}commits offsets without actually producing > the records{*}, which leads to *silent message loss* > > *Steps to Reproduce* > # Generate ~50,000 records (sized similarly to the sample project) in > {{input.topic}} > # Start the topology with the configuration above > # Wait for all messages to be consumed (but not yet flushed to the output > topic) > # Observe: > * > ** Offsets are committed > * > ** Output topic receives no messages > * > ** Log shows repeated {{MESSAGE_TOO_LARGE}} error: > {code:java} > 11:50:30.695 [kafka-producer-network-thread | > kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] > WARN o.a.k.c.producer.internals.Sender - [Producer > clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] > Got error produce response in correlation id 255 on topic-partition > output.topic-0, splitting and retrying (2147483647 attempts left). Error: > MESSAGE_TOO_LARGE {code} > > *Expected Behavior* > In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless > records are {*}successfully produced{*}. -- This message was sent by Atlassian Jira (v8.20.10#820010)