[ https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mihai Lucian updated KAFKA-19479: --------------------------------- Description: *Description* It appears there is a scenario where Kafka Streams running with {{processing.guarantee=at_least_once}} does {*}not uphold its delivery guarantees{*}, resulting in *message loss.* *Reproduction Details* We run a simple Kafka Streams topology like the following: {code:java} {code} *props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE // Large producer batch size to induce MESSAGE_TOO_LARGE props[ProducerConfig.LINGER_MS_CONFIG] = "300000" props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" // Custom handler registered (never triggered) props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = "poc.MyProductionExceptionHandler" val stream = streamsBuilder.stream<String, String>("input.topic") stream.peek \{ key, value -> println("$key:$value") } .to("output.topic")* was: *Description* It appears there is a scenario where Kafka Streams running with {{processing.guarantee=at_least_once}} does {*}not uphold its delivery guarantees{*}, resulting in *message loss.* *Reproduction Details* *We run a simple Kafka Streams topology like the following:* *kotlin```* props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE // Large producer batch size to induce MESSAGE_TOO_LARGE props[ProducerConfig.LINGER_MS_CONFIG] = "300000" props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" // Custom handler registered (never triggered) props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = "poc.MyProductionExceptionHandler" val stream = streamsBuilder.stream<String, String>("input.topic") stream.peek \{ key, value -> println("$key:$value") } .to("output.topic") ``` > at_least_once mode in Kafka Streams silently drops messages when the producer > fails with MESSAGE_TOO_LARGE, violating delivery guarantees. > ------------------------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-19479 > URL: https://issues.apache.org/jira/browse/KAFKA-19479 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 4.0.0 > Reporter: Mihai Lucian > Assignee: Mihai Lucian > Priority: Major > > *Description* > It appears there is a scenario where Kafka Streams running with > {{processing.guarantee=at_least_once}} does {*}not uphold its delivery > guarantees{*}, resulting in *message loss.* > > *Reproduction Details* > We run a simple Kafka Streams topology like the following: > > > {code:java} > {code} > *props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once" > props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = > Serdes.String().javaClass.name > props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE > // Large producer batch size to induce MESSAGE_TOO_LARGE > props[ProducerConfig.LINGER_MS_CONFIG] = "300000" > props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432" > // Custom handler registered (never triggered) > props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] = > "poc.MyProductionExceptionHandler" > val stream = streamsBuilder.stream<String, String>("input.topic") > stream.peek \{ key, value -> println("$key:$value") } > .to("output.topic")* > > -- This message was sent by Atlassian Jira (v8.20.10#820010)