This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new a78491af799 KAFKA-16407: Fix foreign key INNER join on change of FK 
from/to a null value (#19303)
a78491af799 is described below

commit a78491af79998d0c860860314cac3ed54b79f663
Author: Ayoub Omari <ayouboma...@outlook.fr>
AuthorDate: Sun Apr 6 05:13:31 2025 +0200

    KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null 
value (#19303)
    
    Fixes both KAFKA-16407 and KAFKA-16434.
    
    Summary of existing issues:
    
    - We are ignoring new left record when its previous FK value is null
    - We do not unset foreign key join result when FK becomes null
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../SubscriptionSendProcessorSupplier.java         | 29 +++++-----
 .../SubscriptionSendProcessorSupplierTest.java     | 64 +++++++++++++++++++++-
 2 files changed, 74 insertions(+), 19 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index cd39315cc79..efae7ba0b29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -144,27 +144,24 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
         private void defaultJoinInstructions(final Record<K, Change<V>> 
record) {
             if (record.value().oldValue != null) {
                 final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
-                if (oldForeignKey == null) {
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+
+                if (oldForeignKey == null && newForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
-                    return;
-                }
-                if (record.value().newValue != null) {
-                    final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
-                    if (newForeignKey == null) {
-                        logSkippedRecordDueToNullForeignKey();
-                        return;
-                    }
-                    if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
-                        //Different Foreign Key - delete the old key value and 
propagate the new one.
-                        //Delete it from the oldKey's state store
-                        forward(record, oldForeignKey, 
DELETE_KEY_NO_PROPAGATE);
-                    }
+                } else if (oldForeignKey == null) {
+                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+                } else if (newForeignKey == null) {
+                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                } else if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
+                    //Different Foreign Key - delete the old key value and 
propagate the new one.
+                    //Delete it from the oldKey's state store
+                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                     //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
                     //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
                     //and LEFT join.
                     forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-                } else {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                } else { // unchanged FK
+                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
                 }
             } else if (record.value().newValue != null) {
                 final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index 18c0ed9a0e7..255fb093a60 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -295,10 +295,57 @@ public class SubscriptionSendProcessorSupplierTest {
         innerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, leftRecordValue), 0));
 
         assertThat(context.forwarded(), empty());
+    }
 
-        // test dropped-records sensors
-        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
-        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
+    @Test
+    public void innerJoinShouldPropagateChangeFromNullFKToNonNullFK() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        innerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk1);
+
+        innerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, new LeftValue(null)), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void innerJoinShouldDeleteAndPropagateChangeFromNonNullFKToNullFK() 
{
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        innerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(null);
+
+        innerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, new LeftValue(fk1)), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+        );
+    }
+
+    @Test
+    public void 
innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        innerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk1);
+
+        innerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, leftRecordValue), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
     }
 
     @Test
@@ -316,6 +363,17 @@ public class SubscriptionSendProcessorSupplierTest {
         );
     }
 
+    @Test
+    public void 
innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        innerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new 
LeftValue(null)), 0));
+
+        assertThat(context.forwarded(), empty());
+    }
+
     @Test
     public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
         final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();

Reply via email to