[ https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mihai Lucian updated KAFKA-19479: --------------------------------- Summary: at_least_once mode in Kafka Streams silently drops messages when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (was: at_least_once mode in Kafka Streams silently drops messages when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees.) > at_least_once mode in Kafka Streams silently drops messages when the producer > fails with MESSAGE_TOO_LARGE, violating delivery guarantees > ----------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-19479 > URL: https://issues.apache.org/jira/browse/KAFKA-19479 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 4.0.0 > Reporter: Mihai Lucian > Assignee: Mihai Lucian > Priority: Major > > *Description* > It appears there is a scenario where Kafka Streams running with > {{processing.guarantee=at_least_once}} does {*}not uphold its delivery > guarantees{*}, resulting in *message loss.* > > *Reproduction Details* > We run a simple Kafka Streams topology like the following: > > > {code:java} > props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" > props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE > // Large producer batch size to induce MESSAGE_TOO_LARGE > props[ProducerConfig.LINGER_MS_CONFIG] = "300000" > props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" > /** > * a custom ProductionExceptionHandler is registered to demonstrate that it is > not triggered in this scenario. > * in fact, neither the ProductionExceptionHandler nor the > StreamsUncaughtExceptionHandler are invoked during this failure > */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = > "poc.MyProductionExceptionHandler" > val stream = streamsBuilder.stream<String, String>("input.topic") > stream.peek { key, value -> println("$key:$value") } > .to("output.topic")* > {code} > > *What we observe:* > * Records from {{input.topic}} are consumed and buffered at producer side > * After some time (likely based on {{{}commit.interval.ms{}}}), the > *consumer offset is committed* > * The sendind of records to kafka broker fails with > {{{}MESSAGE_TOO_LARGE{}}}{*}{*} > * As a result, the application {*}commits offsets without actually producing > the records{*}, which leads to *silent message loss* > > *Steps to Reproduce* > # Generate ~50,000 records (sized similarly to the sample project) in > {{input.topic}} > # Start the topology with the configuration above > # Wait for all messages to be consumed (but not yet flushed to the output > topic) > # Observe: > ** Offsets are committed > ** Output topic receives no messages > ** Log shows repeated {{MESSAGE_TOO_LARGE}} error: > {code:java} > 11:50:30.695 [kafka-producer-network-thread | > kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] > WARN o.a.k.c.producer.internals.Sender - [Producer > clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] > Got error produce response in correlation id 255 on topic-partition > output.topic-0, splitting and retrying (2147483647 attempts left). Error: > MESSAGE_TOO_LARGE {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)