[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change
[ https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayoub Omari updated KAFKA-16394: Description: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple LEFT foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. +*Scenario1: change foreignKey*+ Input the following {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, 2){code} *+Actual result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, 2){code} A null is propagated to the join result when the foreign key changes +*Scenario 2: Delete PrimaryKey*+ Input {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", null) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) {code} *+Actual result+* {code:java} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, null) {code} An additional null is propagated to the join result. This bug doesn't exist on versions 3.6.0 and below. I believe the issue comes from the line [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] where we propagate the deletion in the two scenarios above Attaching the topology I used. was: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple foreign key join on left-topic's foreignKey field which returns the value in right-topic. +*Scenario1: change foreignKey*+ Input the following {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, 2){code} *+Actual result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, 2){code} A null is propagated to the join result when the foreign key changes +*Scenario 2: Delete PrimaryKey*+ Input {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", null) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) {code} *+Actual result+* {code:java} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, null) {code} An additional null is propagated to the join result. This bug doesn't exist on versions 3.6.0 and below. I believe the issue comes from the line [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] where we propagate the deletion in the two scenarios above Attaching the topology I used. > ForeignKey LEFT join propagates null value on foreignKey change > --- > > Key: KAFKA-16394 > URL: https://issues.apache.org/jira/browse/KAFKA-16394 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > Attachments: ForeignJoinTest.scala, JsonSerde.scala > > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple LEFT foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. > > +*Scenario1: change foreignKey*+ > Input the following > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) > leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) > {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, 2){code} > > *+Actual result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, null) > KeyValue(pk1, 2){code} > > A null is propagated to the join result when the foreign key changes > > +*Scenario 2: Delete PrimaryKey*+ > Input > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("f
[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change
[ https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayoub Omari updated KAFKA-16394: Description: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. +*Scenario1: change foreignKey*+ Input the following {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, 2){code} *+Actual result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, 2){code} A null is propagated to the join result when the foreign key changes +*Scenario 2: Delete PrimaryKey*+ Input {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", null) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) {code} *+Actual result+* {code:java} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, null) {code} An additional null is propagated to the join result. This bug doesn't exist on versions 3.6.0 and below. I believe the issue comes from the line [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] where we propagate the deletion in the two scenarios above Attaching the topology I used. was: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple LEFT foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. +*Scenario1: change foreignKey*+ Input the following {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, 2){code} *+Actual result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, 2){code} A null is propagated to the join result when the foreign key changes +*Scenario 2: Delete PrimaryKey*+ Input {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", null) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) {code} *+Actual result+* {code:java} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, null) {code} An additional null is propagated to the join result. This bug doesn't exist on versions 3.6.0 and below. I believe the issue comes from the line [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] where we propagate the deletion in the two scenarios above Attaching the topology I used. > ForeignKey LEFT join propagates null value on foreignKey change > --- > > Key: KAFKA-16394 > URL: https://issues.apache.org/jira/browse/KAFKA-16394 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > Attachments: ForeignJoinTest.scala, JsonSerde.scala > > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. > > +*Scenario1: change foreignKey*+ > Input the following > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) > leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) > {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, 2){code} > > *+Actual result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, null) > KeyValue(pk1, 2){code} > > A null is propagated to the join result when the foreign key changes > > +*Scenario 2: Delete PrimaryKey*+ > Input > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeIn