[ 
https://issues.apache.org/jira/browse/KAFKA-13769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680858#comment-17680858
 ] 

Matthias J. Sax commented on KAFKA-13769:
-----------------------------------------

I just updated fixed version from 3.0.0 to 3.3.3 and 3.4.0. Cf 
https://issues.apache.org/jira/browse/KAFKA-14646 for details.

> KTable FK join can miss records if an upstream non-key-changing operation 
> changes key serializer
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13769
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Alex Sorokoumov
>            Assignee: Alex Sorokoumov
>            Priority: Major
>             Fix For: 3.4.0, 3.3.3
>
>
> Consider a topology, where the source KTable is followed by a 
> {{transformValues}} operation [that changes the key 
> schema|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L452]
>  followed by a foreign key join. The FK join might miss records in such a 
> topology because they might be sent to the wrong partitions.
> As {{transformValues}} does not change the key itself, repartition won't 
> happen after this operation. However, the KTable instance that calls 
> {{doJoinOnForeignKey}} uses the new serde coming from {{transformValues}} 
> rather than the original. As a result, all nodes in the FK join topology 
> except for 
> [SubscriptionResolverJoinProcessorSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225-L1232]
>  use the "new" serde. {{SubscriptionResolverJoinProcessorSupplier}} uses the 
> old one because it uses 
> [valueGetterSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225]
>  that in turn will retrieve the records from the topic.
> A different serializer might serialize keys to different series of bytes, 
> which will lead to sending them to the wrong partitions. To run into that 
> issue, multiple things must happen:
> * a topic should have more than one partition,
> * KTable's serializer should be modified via a non-key-changing operation,
> * the new serializer should serialize keys differently
> In practice, it might happen if the key type is a {{Struct}} because it 
> serializes to a JSON string {{columnName -> value}}. If the 
> {{transformValues}} operation changes column names to avoid name clashes with 
> the joining table, such join can lead to incorrect behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to