[
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-16394:
------------------------------------
Fix Version/s: 3.9.1
> ForeignKey LEFT join propagates null value on foreignKey change
> ---------------------------------------------------------------
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Ayoub Omari
> Assignee: Ayoub Omari
> Priority: Major
> Fix For: 3.9.1
>
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
> case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The
> resulting join value is the value in right-topic.
>
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>
> A null is propagated to the join result when the foreign key changes
>
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", null) {code}
>
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null) {code}
>
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, null) {code}
> An additional null is propagated to the join result.
>
> This bug doesn't exist on versions 3.6.0 and below.
>
> I believe the issue comes from the line
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
> where we propagate the deletion in the two scenarios above
>
> Attaching the topology I used.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)