[
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-16700.
-------------------------------------
Resolution: Fixed
> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> ------------------------------------------------------------------
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka
> Operators
> Reporter: Karsten Stöckmann
> Priority: Major
> Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic /
> unexplainable message loss on a Kafka Streams topology while performing a
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable<AgencyId, AggregateAgency>
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various
> Kafka topics. A series of Quarkus Kafka Streams applications then performs
> aggregation operations on those topics to create index documents later to be
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to
> produce message loss on the first of a series of FK Left Joins; the right
> hand {{KTable<AgencyId, AggregateAgency>}} is consumed from an aggregated
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the
> {{folderTopicName}} Topic, we observe the following results:
> * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
> * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
> * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
> * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the
> left hand topic should produce an aggregate even if no matching message is
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams
> infrastructure to process the message 'backlog' that had been snapshotted by
> Debezium. A manual snapshot triggered later (i.e. Streams applications
> already running) seems not to show this behaviour. Additionally, as of yet we
> observed this kind of message loss only when running multiple replicas of the
> affected application. When carrying out the tests with only one replica,
> everything seems to work as expected. We've tried to leverage
> {{group.initial.rebalance.delay.ms}} in order to rule out possible
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> default.replication.factor: 3
> min.insync.replicas: 2
> message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
> kafka-streams.num.stream.threads: 5
> kafka-streams.num.standby.replicas: 1
> kafka-streams.auto.offset.reset: earliest
> kafka-streams.cache.max.bytes.buffering: "20971520"
> kafka-streams.commit.interval.ms: 100
> kafka-streams.fetch.max.bytes: "10485760"
> kafka-streams.max.request.size: "10485760"
> kafka-streams.max.partition.fetch.bytes: "10485760"
> kafka-streams.metadata.max.age.ms: 300000
> kafka-streams.statestore.cache.max.bytes: "20971520"
> kafka-streams.topology.optimization: all
> kafka-streams.processing.guarantee: exactly_once_v2
> # Kafka Streams Intermediate Topics
> kafka-streams.topic.compression.type: lz4
> kafka-streams.topic.segment.ms: "43200000" # 12h
> kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h
> kafka-streams.topic.delete.retention.ms: "86400000" # 24h
> kafka-streams.producer.max.request.size: "20971520" # 20MiB
> kafka-streams.producer.transaction.timeout.ms: 100 # Should match
> commit.interval.ms, set close to 100ms for exactly_once_v2
> kafka-streams.consumer.group.instance.id: ${HOSTNAME}
> kafka-streams.consumer.heartbeat.interval.ms: 100
> kafka-streams.consumer.session.timeout.ms: 45000
> {code}
> All input (and aggregate) topics feature 15 partitions and share this
> configuration:
> {code:yaml}
> cleanup.policy: compact
> compression.type: lz4
> segment.ms: "43200000" # 12h
> max.compaction.lag.ms: "86400000" # 24h
> delete.retention.ms: "86400000" # 24h
> {code}
> Logs show no indication of where or why this happens.
> The issue was discussed on the Kafka [mailing
> list|https://lists.apache.org/thread/l50pwmwhobt73db97n0r5v36mydo15rs] as
> well as on
> [StackOverflow|https://stackoverflow.com/questions/78210993/kafka-streams-topology-initially-dropping-messages-to-intermediate-topics],
> but both threads lead to no further explanation. In the end, I was suggested
> to file a bug on the Kafka JIRA. I actually can't rule out that this is
> entirely based upon some setting in our Kafka environment, but there are
> other
> [indications|https://stackoverflow.com/questions/75886729/missing-records-in-kafka-streams-foreign-key-join]
> of similar message loss on FK Join operations. For the time being, I'd
> consider this a bug, perhaps emerging only under certain conditions.
> ATM I've no test case to reproduce the issue locally.
> In case any additional information is needed, I'd be happy to provide those.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)