This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 71c9d83b20e KAFKA-16407: Fix foreign key INNER join on change of FK
from/to a null value (#19303)
71c9d83b20e is described below
commit 71c9d83b20eba87d539ce94a82165345c5228469
Author: Ayoub Omari <[email protected]>
AuthorDate: Sun Apr 6 05:13:31 2025 +0200
KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null
value (#19303)
Fixes both KAFKA-16407 and KAFKA-16434.
Summary of existing issues:
- We are ignoring new left record when its previous FK value is null
- We do not unset foreign key join result when FK becomes null
Reviewers: Matthias J. Sax <[email protected]>
---
.../SubscriptionSendProcessorSupplier.java | 31 +++++------
.../SubscriptionSendProcessorSupplierTest.java | 64 +++++++++++++++++++++-
2 files changed, 75 insertions(+), 20 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 9e6f1833be8..0911d26b6ff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -142,28 +142,25 @@ public class SubscriptionSendProcessorSupplier<K, KO, V>
implements ProcessorSup
private void defaultJoinInstructions(final Record<K, Change<V>>
record) {
if (record.value().oldValue != null) {
- final KO oldForeignKey = record.value().oldValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
- if (oldForeignKey == null) {
+ final KO oldForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
+ final KO newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
+
+ if (oldForeignKey == null && newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
- return;
- }
- if (record.value().newValue != null) {
- final KO newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
- if (newForeignKey == null) {
- logSkippedRecordDueToNullForeignKey();
- return;
- }
- if (!Arrays.equals(serialize(newForeignKey),
serialize(oldForeignKey))) {
- //Different Foreign Key - delete the old key value and
propagate the new one.
- //Delete it from the oldKey's state store
- forward(record, oldForeignKey,
DELETE_KEY_NO_PROPAGATE);
- }
+ } else if (oldForeignKey == null) {
+ forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+ } else if (newForeignKey == null) {
+ forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+ } else if (!Arrays.equals(serialize(newForeignKey),
serialize(oldForeignKey))) {
+ //Different Foreign Key - delete the old key value and
propagate the new one.
+ //Delete it from the oldKey's state store
+ forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
//Add to the newKey's state store. Additionally, propagate
null if no FK is found there,
//since we must "unset" any output set by the previous
FK-join. This is true for both INNER
//and LEFT join.
forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
- } else {
- forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+ } else { // unchanged FK
+ forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
} else if (record.value().newValue != null) {
final KO newForeignKey =
foreignKeyExtractor.extract(record.key(), record.value().newValue);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index 197f79462fb..51691f014b9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -295,10 +295,57 @@ public class SubscriptionSendProcessorSupplierTest {
innerJoinProcessor.process(new Record<>(pk, new
Change<>(leftRecordValue, leftRecordValue), 0));
assertThat(context.forwarded(), empty());
+ }
- // test dropped-records sensors
- assertEquals(1.0, getDroppedRecordsTotalMetric(context));
- assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
+ @Test
+ public void innerJoinShouldPropagateChangeFromNullFKToNonNullFK() {
+ final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
+ innerJoinProcessor.init(context);
+ context.setRecordMetadata("topic", 0, 0);
+
+ final LeftValue leftRecordValue = new LeftValue(fk1);
+
+ innerJoinProcessor.process(new Record<>(pk, new
Change<>(leftRecordValue, new LeftValue(null)), 0));
+
+ assertThat(context.forwarded().size(), is(1));
+ assertThat(
+ context.forwarded().get(0).record(),
+ is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
+ );
+ }
+
+ @Test
+ public void innerJoinShouldDeleteAndPropagateChangeFromNonNullFKToNullFK()
{
+ final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
+ innerJoinProcessor.init(context);
+ context.setRecordMetadata("topic", 0, 0);
+
+ final LeftValue leftRecordValue = new LeftValue(null);
+
+ innerJoinProcessor.process(new Record<>(pk, new
Change<>(leftRecordValue, new LeftValue(fk1)), 0));
+
+ assertThat(context.forwarded().size(), is(1));
+ assertThat(
+ context.forwarded().get(0).record(),
+ is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0),
0))
+ );
+ }
+
+ @Test
+ public void
innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
+ final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
+ innerJoinProcessor.init(context);
+ context.setRecordMetadata("topic", 0, 0);
+
+ final LeftValue leftRecordValue = new LeftValue(fk1);
+
+ innerJoinProcessor.process(new Record<>(pk, new
Change<>(leftRecordValue, leftRecordValue), 0));
+
+ assertThat(context.forwarded().size(), is(1));
+ assertThat(
+ context.forwarded().get(0).record(),
+ is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
+ );
}
@Test
@@ -316,6 +363,17 @@ public class SubscriptionSendProcessorSupplierTest {
);
}
+ @Test
+ public void
innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
+ final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
+ innerJoinProcessor.init(context);
+ context.setRecordMetadata("topic", 0, 0);
+
+ innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new
LeftValue(null)), 0));
+
+ assertThat(context.forwarded(), empty());
+ }
+
@Test
public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
final MockInternalProcessorContext<String,
SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();