[ https://issues.apache.org/jira/browse/KAFKA-13769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680858#comment-17680858 ]
Matthias J. Sax commented on KAFKA-13769: ----------------------------------------- I just updated fixed version from 3.0.0 to 3.3.3 and 3.4.0. Cf https://issues.apache.org/jira/browse/KAFKA-14646 for details. > KTable FK join can miss records if an upstream non-key-changing operation > changes key serializer > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-13769 > URL: https://issues.apache.org/jira/browse/KAFKA-13769 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Alex Sorokoumov > Assignee: Alex Sorokoumov > Priority: Major > Fix For: 3.4.0, 3.3.3 > > > Consider a topology, where the source KTable is followed by a > {{transformValues}} operation [that changes the key > schema|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L452] > followed by a foreign key join. The FK join might miss records in such a > topology because they might be sent to the wrong partitions. > As {{transformValues}} does not change the key itself, repartition won't > happen after this operation. However, the KTable instance that calls > {{doJoinOnForeignKey}} uses the new serde coming from {{transformValues}} > rather than the original. As a result, all nodes in the FK join topology > except for > [SubscriptionResolverJoinProcessorSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225-L1232] > use the "new" serde. {{SubscriptionResolverJoinProcessorSupplier}} uses the > old one because it uses > [valueGetterSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225] > that in turn will retrieve the records from the topic. > A different serializer might serialize keys to different series of bytes, > which will lead to sending them to the wrong partitions. To run into that > issue, multiple things must happen: > * a topic should have more than one partition, > * KTable's serializer should be modified via a non-key-changing operation, > * the new serializer should serialize keys differently > In practice, it might happen if the key type is a {{Struct}} because it > serializes to a JSON string {{columnName -> value}}. If the > {{transformValues}} operation changes column names to avoid name clashes with > the joining table, such join can lead to incorrect behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)