[ https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18008811#comment-18008811 ]
Shashank edited comment on KAFKA-19479 at 7/21/25 9:27 PM: ----------------------------------------------------------- Hi [~mjsax], I would like to start working on this issue. I have reviewed this issue and was able to reproduce the behaviour locally with the attached poc. My initial plan is to create an integration test that reproduces the issue within streams and fails accordingly. I don't yet have a fix in mind, but do you think it would be wise for me to start by writing the failing integration test and then opening a PR with this failing test? That way, you can review it to confirm whether this is indeed a bug. Would opening such a PR affect any existing workflows, or is that approach okay? Once we align on that and confirm that this is a bug, I can proceed with the investigation and work on a possible solution. was (Author: JIRAUSER310276): Hi [~mjsax], I would like to start working on this issue. I have reviewed this issue and was able to reproduce the behaviour locally with the attached poc. My initial plan is to create an integration test that reproduces the issue within streams and fails accordingly. I don't yet have a fix in mind, but do you think it would be wise for me to start by writing the failing integration test and then opening a PR with this failing test? That way, you can review it to confirm whether this is indeed a bug. Would opening such a PR affect any existing workflows, or is that approach okay? Once we align on that, I can proceed with the investigation and work on a solution. > at_least_once mode in Kafka Streams silently drops messages when the producer > fails with MESSAGE_TOO_LARGE, violating delivery guarantees > ----------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-19479 > URL: https://issues.apache.org/jira/browse/KAFKA-19479 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 4.0.0 > Environment: Micronaut 4.5.4 > Java 21 > Kotlin 1.9.3 > Kafka clients/streams: > Apache Kafka 3.7.0, 4.0.0 > Confluent: 7.9.2-ccs, 8.0.0-ccs > Kafka running in Docker (local test environment) > Reporter: Mihai Lucian > Assignee: Shashank > Priority: Critical > Attachments: poc-kafka-streams-al-least-once-proj.zip, > stream-configs.txt > > > *Description* > It appears there is a scenario where Kafka Streams running with > {{processing.guarantee=at_least_once}} does {*}not uphold its delivery > guarantees{*}, resulting in *message loss.* > > *Reproduction Details* > We run a simple Kafka Streams topology like the following: > {code:java} > props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" > props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE > // Large producer batch size to induce MESSAGE_TOO_LARGE > props[ProducerConfig.LINGER_MS_CONFIG] = "300000" > props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" > /** > * a custom ProductionExceptionHandler is registered to demonstrate that it is > not triggered in this scenario. > * in fact, neither the ProductionExceptionHandler nor the > StreamsUncaughtExceptionHandler are invoked during this failure > */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = > "poc.MyProductionExceptionHandler" > val stream = streamsBuilder.stream<String, String>("input.topic") > stream.peek { key, value -> println("$key:$value") } > .to("output.topic")* > {code} > > *What we observe:* > * Records from {{input.topic}} are consumed and buffered at producer side > * After some time (likely based on {{{}commit.interval.ms{}}}), the > *consumer offset is committed* > * Producer records *flush* is triggered > * The sendind of records to kafka broker fails with > {{{}MESSAGE_TOO_LARGE{}}}{*}{*} > * As a result, the application {*}commits offsets without actually producing > the records{*}, which leads to *silent message loss* > > *Steps to Reproduce* > # Generate ~50,000 records (sized similarly to the sample project) in > {{input.topic to induce MESSAGE_TOO_LARGE}} > # Start the topology with the configuration above > # Wait for all messages to be consumed > # Observe: > * > ** Offsets are committed > * > ** Output topic receives no messages > * > ** Log shows repeated {{MESSAGE_TOO_LARGE}} error: > {code:java} > 11:50:30.695 [kafka-producer-network-thread | > kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] > WARN o.a.k.c.producer.internals.Sender - [Producer > clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer] > Got error produce response in correlation id 255 on topic-partition > output.topic-0, splitting and retrying (2147483647 attempts left). Error: > MESSAGE_TOO_LARGE {code} > > *Reproduced* with : > * kafka-client-3.7.0, kafka-streams-3.7.0 > * kafka-client-4.-.0, kafka-streams-4.0.0 > * kafka-client-7.9.2-ccs, kafka-streams-7.9.2-ccs > * kafka-client-8.0.0-ccs, kafka-streams-8.0.0-ccs > > *Expected Behavior* > In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless > records are {*}successfully produced{*}. > > *Attached* > * configs for stream, producer, consumer > * sample project used to replicate the issue -- This message was sent by Atlassian Jira (v8.20.10#820010)