Karsten Stöckmann created KAFKA-16700:
-----------------------------------------

             Summary: Kafka Streams: possible message loss on KTable-KTable FK 
Left Join
                 Key: KAFKA-16700
                 URL: https://issues.apache.org/jira/browse/KAFKA-16700
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.7.0
         Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 3 
controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka Operators
            Reporter: Karsten Stöckmann


We are experiencing significant, yet intermittent / non-deterministic / 
unexplainable message loss on a Kafka Streams topology while performing a 
*KTable-KTable* {*}FK Left Join{*}.

Assume the following snippet:
{code:java}
streamsBuilder
    .table(
        folderTopicName,
        Consumed.with(
            folderKeySerde,
            folderSerde))
        .leftJoin(
            agencies, // KTable<AgencyId, AggregateAgency>
            Folder::agencyIdValue,
            AggregateFolder::new,
            TableJoined.as("folder-to-agency"),
            Materialized
                .as("folder-to-agency-materialized")
                .withKeySerde(folderKeySerde)
                .withValueSerde(aggregateFolderSerde))
        .leftJoin(
            documents,
{code}
The setup is as follows:

A Debezium Connector for PostgreSQL streams database changes into various Kafka 
topics. A series of Quarkus Kafka Streams applications then performs 
aggregation operations on those topics to create index documents later to be 
sent into an OpenSearch system.

When firing up the Kafka Streams infrastructure to work on initially populated 
Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to 
Kafka), the above shown KTable-KTable FK Left Join seems to produce message 
loss on the first of a series of FK Left Joins; the right hand 
{{KTable<AgencyId, AggregateAgency>}} is consumed from an aggregated topic fed 
from another Kafka Streams topology / application.

On a (heavily reduced) test data set of 6828 messages in the 
{{folderTopicName}} Topic, we observe the following results:
 * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
 * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
 * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
 * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.

Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
left hand topic should produce an aggregate even if no matching message is 
found in the right hand topic.

Message loss unpredictably varies across tests and seems not to be bound to 
specific keys or messages.

As it seems, this can only be observed when initially firing up the Streams 
infrastructure to process the message 'backlog' that had been snapshotted by 
Debezium. A manual snapshot triggered later (i.e. Streams applications already 
running) seems not to show this behaviour. Additionally, as of yet we observed 
this kind of message loss only when running multiple replicas of the affected 
application. When carrying out the tests with only one replica, everything 
seems to work as expected. We've tried to leverage 
{{group.initial.rebalance.delay.ms}} in order to rule out possible rebalancing 
issues, but to no avail.

Our Kafka configuration:
{code:yaml}
    offsets.topic.replication.factor: 3
    transaction.state.log.replication.factor: 3
    transaction.state.log.min.isr: 2
    default.replication.factor: 3
    min.insync.replicas: 2
    message.max.bytes: "20971520"
{code}
Our Kafka Streams application configuration:
{code:yaml}
    kafka-streams.num.stream.threads: 5
    kafka-streams.num.standby.replicas: 1
    kafka-streams.auto.offset.reset: earliest
    kafka-streams.cache.max.bytes.buffering: "20971520"
    kafka-streams.commit.interval.ms: 100
    kafka-streams.fetch.max.bytes: "10485760"
    kafka-streams.max.request.size: "10485760"
    kafka-streams.max.partition.fetch.bytes: "10485760"
    kafka-streams.metadata.max.age.ms: 300000
    kafka-streams.statestore.cache.max.bytes: "20971520"
    kafka-streams.topology.optimization: all
    kafka-streams.processing.guarantee: exactly_once_v2

    # Kafka Streams Intermediate Topics
    kafka-streams.topic.compression.type: lz4
    kafka-streams.topic.segment.ms: "43200000" # 12h
    kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h
    kafka-streams.topic.delete.retention.ms: "86400000" # 24h

    kafka-streams.producer.max.request.size: "20971520" # 20MiB
    kafka-streams.producer.transaction.timeout.ms: 100 # Should match 
commit.interval.ms, set close to 100ms for exactly_once_v2

    kafka-streams.consumer.group.instance.id: ${HOSTNAME}
    kafka-streams.consumer.heartbeat.interval.ms: 100
    kafka-streams.consumer.session.timeout.ms: 45000
{code}
All input (and aggregate) topics feature 15 partitions and share this 
configuration:
{code:yaml}
cleanup.policy: compact          
compression.type: lz4
segment.ms: "43200000" # 12h
max.compaction.lag.ms: "86400000" # 24h
delete.retention.ms: "86400000" # 24h
{code}
Logs show no indication of where or why this happens.

The issue was discussed on the Kafka [mailing 
list|https://lists.apache.org/thread/l50pwmwhobt73db97n0r5v36mydo15rs] as well 
as on 
[StackOverflow|https://stackoverflow.com/questions/78210993/kafka-streams-topology-initially-dropping-messages-to-intermediate-topics],
 but both threads lead to no further explanation. In the end, I was suggested 
to file a bug on the Kafka JIRA. I actually can't rule out that this is 
entirely based upon some setting in our Kafka environment, but there are other 
[indications|https://stackoverflow.com/questions/75886729/missing-records-in-kafka-streams-foreign-key-join]
 of similar message loss on FK Join operations. For the time being, I'd 
consider this a bug, perhaps emerging only under certain conditions.

ATM I've no test case to reproduce the issue locally.

In case any additional information is needed, I'd be happy to provide those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to