[ https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ayoub Omari updated KAFKA-16434: -------------------------------- Description: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *INNER* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. (same topology example as in KAFKA-16407) *Scenario: Unset foreign key of a primary key* {code:scala} rightTopic.pipeInput("fk1", "1") leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) {code} *+Actual result+* {code:java} KeyValue(pk1, 3) {code} *+Expected result+* {code:java} KeyValue(pk1, 3) KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} However, in {+}other cases{+}, where the join result should be unset (e.g. the primary key is deleted, or the foreign key changes to a non existing FK), that record is {+}correctly emitted{+}. Also, the importance of unsetting the join result is mentioned in the code: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36] {code:java} //[...] Additionally, propagate null if no FK is found there, // since we must "unset" any output set by the previous FK-join. This is true for both INNER and LEFT join. {code} was: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *INNER* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. (same topology example as in KAFKA-16407) *Scenario: Unset foreign key of a primary key* {code:scala} rightTopic.pipeInput("fk1", "1") leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) {code} *+Actual result+* {code:java} KeyValue(pk1, 3) {code} *+Expected result+* {code:java} KeyValue(pk1, 3) KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} However, in {+}other cases{+}, where the join result should be unset (e.g. the primary key is deleted, or the foreign key changes to a non existing FK), that record is {+}correctly emitted{+}. Also, the importance of unsetting the join result is mentioned in the code: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36 {code:java} //[...] Additionally, propagate null if no FK is found there, // since we must "unset" any output set by the previous FK-join. This is true for both INNER and LEFT join. {code} > ForeignKey INNER join does not unset join result when FK becomes null > --------------------------------------------------------------------- > > Key: KAFKA-16434 > URL: https://issues.apache.org/jira/browse/KAFKA-16434 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.2, 3.7.0 > Reporter: Ayoub Omari > Priority: Major > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *INNER* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. (same topology example as > in KAFKA-16407) > > *Scenario: Unset foreign key of a primary key* > {code:scala} > rightTopic.pipeInput("fk1", "1") > leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) > leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) > {code} > > *+Actual result+* > {code:java} > KeyValue(pk1, 3) {code} > > *+Expected result+* > {code:java} > KeyValue(pk1, 3) > KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} > > However, in {+}other cases{+}, where the join result should be unset (e.g. > the primary key is deleted, or the foreign key changes to a non existing FK), > that record is {+}correctly emitted{+}. > > Also, the importance of unsetting the join result is mentioned in the code: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36] > > {code:java} > //[...] Additionally, propagate null if no FK is found there, > // since we must "unset" any output set by the previous FK-join. This is true > for both INNER and LEFT join. {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)