[ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-16394:
-----------------------------------

    Assignee: Ayoub Omari

> ForeignKey LEFT join propagates null value on foreignKey change
> ---------------------------------------------------------------
>
>                 Key: KAFKA-16394
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16394
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Ayoub Omari
>            Assignee: Ayoub Omari
>            Priority: Major
>         Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple foreign key join on left-topic's foreignKey field which 
> returns the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", null) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null) {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, null) {code}
> An additional null is propagated to the join result.
>  
> This bug doesn't exist on versions 3.6.0 and below.
>  
> I believe the issue comes from the line 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
> where we propagate the deletion in the two scenarios above
>  
> Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to