This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new d37f8840358 KAFKA-18713: Fix FK Left-Join result race condition 
(#19005)
d37f8840358 is described below

commit d37f88403582c61c4cc3e10f35887841d12a8212
Author: nilmadhab mondal <[email protected]>
AuthorDate: Fri Apr 4 01:22:47 2025 +0200

    KAFKA-18713: Fix FK Left-Join result race condition (#19005)
    
    When a row in a FK-join left table is updated, we should send a "delete
    subscription with no response" for the old FK to the right hand side, to
    avoid getting two responses from the right hand side. Only the "new
    subscription" for the new FK should request a response. If two responses
    are requested, there is a race condition for which both responses could
    be processed in the wrong order, leading to an incorrect join result.
    
    This PR fixes the "delete subscription" case accordingly, to no request
    a response.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../SubscriptionSendProcessorSupplier.java         |   2 +-
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 135 +++++++++++++++++----
 .../SubscriptionSendProcessorSupplierTest.java     |   4 +-
 3 files changed, 112 insertions(+), 29 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index eb864f3910b..cd39315cc79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -132,7 +132,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
                 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
                 final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
                 if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
                 forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             } else if (record.value().newValue != null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index b4ac10fdbc6..e3f502562a0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
@@ -59,10 +60,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -182,13 +186,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this 
unreferenced FK won't show up in any results
 
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(emptyMap())
+                outputTopic.readKeyValuesToList(),
+                is(emptyList())
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(emptyMap())
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    is(emptyList())
                 );
             }
             if (materialized) {
@@ -202,27 +206,27 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+                final List<KeyValue<String, String>> expected = Arrays.asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
                 );
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
-                            mkEntry("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(asList(
+                            KeyValue.pair("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+                            KeyValue.pair("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
                         asMap(store),
-                        is(expected)
+                        is(expected.stream().collect(Collectors.toMap(kv -> 
kv.key, kv -> kv.value)))
                     );
                 }
             }
@@ -231,16 +235,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    outputTopic.readKeyValuesToList(),
+                    is(asList(
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(asList(
+                            new KeyValue<>("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
                         ))
                     );
                 }
@@ -255,21 +259,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     );
                 }
             }
+
             // Now delete one LHS entity such that one delete is propagated 
down to the output.
 
             left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", null)
+                outputTopic.readKeyValuesToList(),
+                is(asList(
+                    new KeyValue<>("lhs1", null)
                 ))
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", null)
-                    ))
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    hasItem(
+                        KeyValue.pair("lhs1", null))
                 );
             }
             if (materialized) {
@@ -284,6 +288,79 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("testCases")
+    public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean 
leftJoin,
+                                                         final String 
optimization,
+                                                         final boolean 
materialized,
+                                                         final boolean rejoin,
+                                                         final boolean 
leftVersioned,
+                                                         final boolean 
rightVersioned) {
+        final Properties streamsConfig = getStreamsProperties(optimization);
+        final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
+            final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? 
driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new 
StringDeserializer()) : null;
+            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+
+            // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+            right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
+
+            assertThat(
+                outputTopic.readKeyValuesToList(),
+                is(emptyList())
+            );
+            if (rejoin) {
+                assertThat(
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    is(emptyList())
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+
+            {
+                final List<KeyValue<String, String>> expected = asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToList(),
+                    is(expected)
+                );
+            }
+
+            // Add another reference to an existing FK
+            left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToList(),
+                    is(asList(
+                        new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                    ))
+                );
+            }
+
+            // Now revert back the foreign key to earlier reference
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
+            assertThat(
+                outputTopic.readKeyValuesToList(),
+                is(asList(
+                    new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                ))
+            );
+        }
+    }
+
     @ParameterizedTest
     @MethodSource("testCases")
     public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@@ -792,6 +869,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         return result;
     }
 
+    protected static List<KeyValue<String, String>> makeList(final 
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+        final List<KeyValue<String, String>> result = new LinkedList<>();
+        store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key, 
ele.value.value())));
+        return result;
+    }
+
     protected static Topology getTopology(final Properties streamsConfig,
                                           final String queryableStoreName,
                                           final boolean leftJoin,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index def6d9b36d3..18c0ed9a0e7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -149,7 +149,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
     }
 
@@ -198,7 +198,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+            is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
     }
 

Reply via email to