This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new d37f8840358 KAFKA-18713: Fix FK Left-Join result race condition
(#19005)
d37f8840358 is described below
commit d37f88403582c61c4cc3e10f35887841d12a8212
Author: nilmadhab mondal <[email protected]>
AuthorDate: Fri Apr 4 01:22:47 2025 +0200
KAFKA-18713: Fix FK Left-Join result race condition (#19005)
When a row in a FK-join left table is updated, we should send a "delete
subscription with no response" for the old FK to the right hand side, to
avoid getting two responses from the right hand side. Only the "new
subscription" for the new FK should request a response. If two responses
are requested, there is a race condition for which both responses could
be processed in the wrong order, leading to an incorrect join result.
This PR fixes the "delete subscription" case accordingly, to no request
a response.
Reviewers: Matthias J. Sax <[email protected]>
---
.../SubscriptionSendProcessorSupplier.java | 2 +-
.../KTableKTableForeignKeyJoinIntegrationTest.java | 135 +++++++++++++++++----
.../SubscriptionSendProcessorSupplierTest.java | 4 +-
3 files changed, 112 insertions(+), 29 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index eb864f3910b..cd39315cc79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -132,7 +132,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V>
implements ProcessorSup
final KO oldForeignKey =
foreignKeyExtractor.apply(record.value().oldValue);
final KO newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.apply(record.value().newValue);
if (oldForeignKey != null &&
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
- forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+ forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else if (record.value().newValue != null) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index b4ac10fdbc6..e3f502562a0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
@@ -59,10 +60,13 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -182,13 +186,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this
unreferenced FK won't show up in any results
assertThat(
- outputTopic.readKeyValuesToMap(),
- is(emptyMap())
+ outputTopic.readKeyValuesToList(),
+ is(emptyList())
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(emptyMap())
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(emptyList())
);
}
if (materialized) {
@@ -202,27 +206,27 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+ final List<KeyValue<String, String>> expected = Arrays.asList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
);
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(expected)
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1",
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
- mkEntry("lhs2",
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(asList(
+ KeyValue.pair("lhs1",
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+ KeyValue.pair("lhs2",
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
))
);
}
if (materialized) {
assertThat(
asMap(store),
- is(expected)
+ is(expected.stream().collect(Collectors.toMap(kv ->
kv.key, kv -> kv.value)))
);
}
}
@@ -231,16 +235,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
{
assertThat(
- outputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+ outputTopic.readKeyValuesToList(),
+ is(asList(
+ new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
))
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs3",
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(asList(
+ new KeyValue<>("lhs3",
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
))
);
}
@@ -255,21 +259,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
);
}
}
+
// Now delete one LHS entity such that one delete is propagated
down to the output.
left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
assertThat(
- outputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1", null)
+ outputTopic.readKeyValuesToList(),
+ is(asList(
+ new KeyValue<>("lhs1", null)
))
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1", null)
- ))
+ rejoinOutputTopic.readKeyValuesToList(),
+ hasItem(
+ KeyValue.pair("lhs1", null))
);
}
if (materialized) {
@@ -284,6 +288,79 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
+ @ParameterizedTest
+ @MethodSource("testCases")
+ public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean
leftJoin,
+ final String
optimization,
+ final boolean
materialized,
+ final boolean rejoin,
+ final boolean
leftVersioned,
+ final boolean
rightVersioned) {
+ final Properties streamsConfig = getStreamsProperties(optimization);
+ final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
+ final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ?
driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new
StringDeserializer()) : null;
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
+
+ // Pre-populate the RHS records. This test is all about what
happens when we add/remove LHS records
+ right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
+ right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
+
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(emptyList())
+ );
+ if (rejoin) {
+ assertThat(
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(emptyList())
+ );
+ }
+ if (materialized) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
+
+ {
+ final List<KeyValue<String, String>> expected = asList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ );
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(expected)
+ );
+ }
+
+ // Add another reference to an existing FK
+ left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
+ {
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(asList(
+ new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ ))
+ );
+ }
+
+ // Now revert back the foreign key to earlier reference
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
+ assertThat(
+ outputTopic.readKeyValuesToList(),
+ is(asList(
+ new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ ))
+ );
+ }
+ }
+
@ParameterizedTest
@MethodSource("testCases")
public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@@ -792,6 +869,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
return result;
}
+ protected static List<KeyValue<String, String>> makeList(final
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+ final List<KeyValue<String, String>> result = new LinkedList<>();
+ store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key,
ele.value.value())));
+ return result;
+ }
+
protected static Topology getTopology(final Properties streamsConfig,
final String queryableStoreName,
final boolean leftJoin,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index def6d9b36d3..18c0ed9a0e7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -149,7 +149,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
- is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0),
0))
+ is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0),
0))
);
}
@@ -198,7 +198,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
- is(new Record<>(fk1, new SubscriptionWrapper<>(null,
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+ is(new Record<>(fk1, new SubscriptionWrapper<>(null,
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}