This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 2c48809fad3 KAFKA-18713: Fix FK Left-Join result race condition 
(#19005)
2c48809fad3 is described below

commit 2c48809fad31e6773bdca1291fcecced45214d2a
Author: nilmadhab mondal <[email protected]>
AuthorDate: Fri Apr 4 01:22:47 2025 +0200

    KAFKA-18713: Fix FK Left-Join result race condition (#19005)
    
    When a row in a FK-join left table is updated, we should send a "delete
    subscription with no response" for the old FK to the right hand side, to
    avoid getting two responses from the right hand side. Only the "new
    subscription" for the new FK should request a response. If two responses
    are requested, there is a race condition for which both responses could
    be processed in the wrong order, leading to an incorrect join result.
    
    This PR fixes the "delete subscription" case accordingly, to no request
    a response.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 135 +++++++++++++++++----
 .../SubscriptionSendProcessorSupplier.java         |   2 +-
 .../SubscriptionSendProcessorSupplierTest.java     |   8 +-
 3 files changed, 114 insertions(+), 31 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index b4ac10fdbc6..b73da0f8a35 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
@@ -59,10 +60,13 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -182,13 +186,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this 
unreferenced FK won't show up in any results
 
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(emptyMap())
+                outputTopic.readKeyValuesToList(),
+                is(emptyList())
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(emptyMap())
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    is(emptyList())
                 );
             }
             if (materialized) {
@@ -202,27 +206,27 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+                final List<KeyValue<String, String>> expected = Arrays.asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
                 );
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
-                            mkEntry("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(asList(
+                            KeyValue.pair("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+                            KeyValue.pair("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
                         asMap(store),
-                        is(expected)
+                        is(expected.stream().collect(Collectors.toMap(kv -> 
kv.key, kv -> kv.value)))
                     );
                 }
             }
@@ -231,16 +235,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(List.of(
+                            new KeyValue<>("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
                         ))
                     );
                 }
@@ -255,21 +259,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                     );
                 }
             }
+
             // Now delete one LHS entity such that one delete is propagated 
down to the output.
 
             left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", null)
+                outputTopic.readKeyValuesToList(),
+                is(List.of(
+                    new KeyValue<>("lhs1", null)
                 ))
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", null)
-                    ))
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    hasItem(
+                        KeyValue.pair("lhs1", null))
                 );
             }
             if (materialized) {
@@ -284,6 +288,79 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("testCases")
+    public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean 
leftJoin,
+                                                         final String 
optimization,
+                                                         final boolean 
materialized,
+                                                         final boolean rejoin,
+                                                         final boolean 
leftVersioned,
+                                                         final boolean 
rightVersioned) {
+        final Properties streamsConfig = getStreamsProperties(optimization);
+        final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
+            final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
+            final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? 
driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new 
StringDeserializer()) : null;
+            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+
+            // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
+            right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+            right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
+
+            assertThat(
+                outputTopic.readKeyValuesToList(),
+                is(emptyList())
+            );
+            if (rejoin) {
+                assertThat(
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    is(emptyList())
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    asMap(store),
+                    is(emptyMap())
+                );
+            }
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+
+            {
+                final List<KeyValue<String, String>> expected = asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                );
+                assertThat(
+                    outputTopic.readKeyValuesToList(),
+                    is(expected)
+                );
+            }
+
+            // Add another reference to an existing FK
+            left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
+            {
+                assertThat(
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                    ))
+                );
+            }
+
+            // Now revert back the foreign key to earlier reference
+
+            left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
+            assertThat(
+                outputTopic.readKeyValuesToList(),
+                is(List.of(
+                    new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                ))
+            );
+        }
+    }
+
     @ParameterizedTest
     @MethodSource("testCases")
     public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@@ -792,6 +869,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         return result;
     }
 
+    protected static List<KeyValue<String, String>> makeList(final 
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+        final List<KeyValue<String, String>> result = new LinkedList<>();
+        store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key, 
ele.value.value())));
+        return result;
+    }
+
     protected static Topology getTopology(final Properties streamsConfig,
                                           final String queryableStoreName,
                                           final boolean leftJoin,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 10199c242b4..9e6f1833be8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -131,7 +131,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
                 final KO oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
                 final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
                 if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
                 forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             } else if (record.value().newValue != null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index 87366bd5334..197f79462fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -149,7 +149,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+            is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
     }
 
@@ -198,7 +198,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+            is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
     }
 
@@ -438,7 +438,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
     }
 
@@ -491,7 +491,7 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded().size(), greaterThan(0));
         assertThat(
             context.forwarded().get(0).record(),
-            is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+            is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
     }
 

Reply via email to