[ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831468#comment-17831468
 ] 

Matthias J. Sax commented on KAFKA-16407:
-----------------------------------------

Did 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 not fix this?

> ForeignKey INNER join ignores FK change when its previous value is null
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-16407
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16407
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>            Reporter: Ayoub Omari
>            Assignee: Ayoub Omari
>            Priority: Major
>         Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to