[ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:
--------------------------------
    Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*

 
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*

 
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 


> ForeignKey INNER join does not unset join result when FK becomes null
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-16434
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16434
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.2, 3.7.0
>            Reporter: Ayoub Omari
>            Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
>  
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to