This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 62c6697ac98 KAFKA-19208: KStream-GlobalKTable join should not drop
left-null-key record (#19580)
62c6697ac98 is described below
commit 62c6697ac98dd361b7fd669b2b0776c00cc619f2
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu May 15 18:34:07 2025 -0700
KAFKA-19208: KStream-GlobalKTable join should not drop left-null-key record
(#19580)
Reviewers: Lucas Brutschy <[email protected]>
---
.../internals/KStreamKTableJoinProcessor.java | 2 +-
.../internals/KStreamGlobalKTableJoinTest.java | 112 ++++++++++++++---
.../internals/KStreamGlobalKTableLeftJoinTest.java | 140 ++++++++++++++++-----
.../kstream/internals/KStreamKTableJoinTest.java | 71 ++++++++---
.../internals/KStreamKTableLeftJoinTest.java | 62 +++++++++
5 files changed, 322 insertions(+), 65 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index e81877c99e7..17f1ceb22ff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -150,7 +150,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
// furthermore, on left/outer joins 'null' in ValueJoiner#apply()
indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null
values are ignored
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
- if (leftJoin && record.key() == null && record.value() != null) {
+ if (leftJoin && mappedKey == null && record.value() != null) {
return false;
}
if (mappedKey == null || record.value() == null) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index cd9126f03e5..6085bdacf68 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +47,10 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamGlobalKTableJoinTest {
@@ -89,7 +94,7 @@ public class KStreamGlobalKTableJoinTest {
}
keyMapper = (key, value) -> {
final String[] tokens = value.split(",");
- // Value is comma delimited. If second token is present, it's the
key to the global ktable.
+ // Value is comma-delimited. If second token is present, it's the
key to the global ktable.
// If not present, use null to indicate no match
return tokens.length > 1 ? tokens[1] : null;
};
@@ -169,8 +174,10 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 2),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3)
+ );
// push all items to the globalTable. this should not produce any item
@@ -180,10 +187,12 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+YY0", 6),
- new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
- new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
- new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6),
+ new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+ new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+ new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9)
+ );
// push all items to the globalTable. this should not produce any item
@@ -202,8 +211,10 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1)
+ );
}
@@ -218,10 +229,12 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+ new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+ new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3)
+ );
// push two items with null to the globalTable as deletes. this should
not produce any item.
@@ -231,8 +244,10 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "XX", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2,
"XX2,FKey2+Y2", 6),
- new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
+ new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7)
+ );
}
@Test
@@ -248,10 +263,54 @@ public class KStreamGlobalKTableJoinTest {
pushToStream(4, "XXX", false, false);
processor.checkAndClearProcessResult(EMPTY);
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id",
Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(4.0)
+ );
+ }
+
+ @Test
+ public void shouldNotJoinOnNullKeyMapperValuesWithNullKeys() {
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "Y");
+ processor.checkAndClearProcessResult(EMPTY);
+
+ // push all four items to the primary stream with no foreign key,
resulting in null keyMapper values.
+ // this should not produce any item.
+
+ pushToStream(4, "XXX", false, true);
+ processor.checkAndClearProcessResult(EMPTY);
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id",
Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(4.0)
+ );
}
@Test
- public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
+ public void shouldJoinOnNullKey() {
// push two items to the globalTable. this should not produce any item.
pushToGlobalTable(2, "Y");
@@ -260,7 +319,24 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "X", true, true);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1)
+ );
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id",
Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(0.0)
+ );
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index b0a44c36e27..6f49917a012 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +47,10 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class KStreamGlobalKTableLeftJoinTest {
@@ -89,7 +94,7 @@ public class KStreamGlobalKTableLeftJoinTest {
}
keyMapper = (key, value) -> {
final String[] tokens = value.split(",");
- // Value is comma delimited. If second token is present, it's the
key to the global ktable.
+ // Value is comma-delimited. If second token is present, it's the
key to the global ktable.
// If not present, use null to indicate no match
return tokens.length > 1 ? tokens[1] : null;
};
@@ -150,8 +155,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+null", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+null", 1)
+ );
}
@Test
@@ -160,8 +167,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+null", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+null", 1)
+ );
// push two items to the globalTable. this should not produce any item.
@@ -171,10 +180,12 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 2),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
- new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
- new KeyValueTimestamp<>(3, "X3,FKey3+null", 5));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
+ new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
+ new KeyValueTimestamp<>(3, "X3,FKey3+null", 5)
+ );
// push all items to the globalTable. this should not produce any item
@@ -184,10 +195,12 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+YY0", 6),
- new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
- new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
- new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6),
+ new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+ new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+ new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9)
+ );
// push all items to the globalTable. this should not produce any item
@@ -206,10 +219,12 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+ new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
+ new KeyValueTimestamp<>(3, "X3,FKey3+null", 3)
+ );
}
@@ -224,10 +239,12 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+ new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+ new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3)
+ );
// push two items with null to the globalTable as deletes. this should
not produce any item.
@@ -236,15 +253,17 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
- pushToStream(4, "XX", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"XX0,FKey0+null", 4),
- new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5),
- new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
- new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
+ pushToStream(4, "X", true, false);
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0,FKey0+null", 4),
+ new KeyValueTimestamp<>(1, "X1,FKey1+null", 5),
+ new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 6),
+ new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 7)
+ );
}
@Test
- public void shouldNotJoinOnNullKeyMapperValues() {
+ public void shouldJoinOnNullKeyMapperValues() {
// push all items to the globalTable. this should not produce any item
@@ -255,11 +274,66 @@ public class KStreamGlobalKTableLeftJoinTest {
// this should not produce any item.
pushToStream(4, "XXX", false, false);
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "XXX0+null", 0),
+ new KeyValueTimestamp<>(1, "XXX1+null", 1),
+ new KeyValueTimestamp<>(2, "XXX2+null", 2),
+ new KeyValueTimestamp<>(3, "XXX3+null", 3)
+ );
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id",
Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(0.0)
+ );
+ }
+
+ @Test
+ public void shouldJoinOnNullKeyMapperValuesWithNullKeys() {
+
+ // push all items to the globalTable. this should not produce any item
+
+ pushToGlobalTable(4, "Y");
processor.checkAndClearProcessResult(EMPTY);
+
+ // push all four items to the primary stream with no foreign key,
resulting in null keyMapper values.
+ // this should not produce any item.
+
+ pushToStream(4, "XXX", false, true);
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(null, "XXX0+null", 0),
+ new KeyValueTimestamp<>(1, "XXX1+null", 1),
+ new KeyValueTimestamp<>(2, "XXX2+null", 2),
+ new KeyValueTimestamp<>(3, "XXX3+null", 3)
+ );
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id",
Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(0.0)
+ );
}
@Test
- public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
+ public void shouldJoinOnNullKey() {
// push four items to the globalTable. this should not produce any
item.
pushToGlobalTable(4, "Y");
@@ -268,9 +342,11 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, true);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+ new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+ new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3)
+ );
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index c4552715982..1419fd716c2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -52,6 +53,8 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -354,8 +357,10 @@ public class KStreamKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1+Y1", 1));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1+Y1", 1)
+ );
// push all items to the table. this should not produce any item
pushToTable(4, "YY");
@@ -363,10 +368,12 @@ public class KStreamKTableJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0+YY0", 0),
- new KeyValueTimestamp<>(1, "X1+YY1", 1),
- new KeyValueTimestamp<>(2, "X2+YY2", 2),
- new KeyValueTimestamp<>(3, "X3+YY3", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+YY0", 0),
+ new KeyValueTimestamp<>(1, "X1+YY1", 1),
+ new KeyValueTimestamp<>(2, "X2+YY2", 2),
+ new KeyValueTimestamp<>(3, "X3+YY3", 3)
+ );
// push all items to the table. this should not produce any item
pushToTable(4, "YYY");
@@ -381,8 +388,10 @@ public class KStreamKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1+Y1", 1));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1+Y1", 1)
+ );
}
@Test
@@ -393,10 +402,12 @@ public class KStreamKTableJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1+Y1", 1),
- new KeyValueTimestamp<>(2, "X2+Y2", 2),
- new KeyValueTimestamp<>(3, "X3+Y3", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1+Y1", 1),
+ new KeyValueTimestamp<>(2, "X2+Y2", 2),
+ new KeyValueTimestamp<>(3, "X3+Y3", 3)
+ );
// push two items with null to the table as deletes. this should not
produce any item.
pushNullValueToTable();
@@ -404,8 +415,10 @@ public class KStreamKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "XX");
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2,
"XX2+Y2", 2),
- new KeyValueTimestamp<>(3, "XX3+Y3", 3));
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "XX2+Y2", 2),
+ new KeyValueTimestamp<>(3, "XX3+Y3", 3)
+ );
}
@Test
@@ -420,6 +433,21 @@ public class KStreamKTableJoinTest {
hasItem("Skipping record due to null join key or value.
topic=[streamTopic] partition=[0] "
+ "offset=[0]"));
}
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id", Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(1.0)
+ );
}
@Test
@@ -435,6 +463,21 @@ public class KStreamKTableJoinTest {
+ "offset=[0]")
);
}
+
+ assertThat(
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id",
Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue(),
+ is(1.0)
+ );
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 42ff6aa8d92..25eafd3043b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
@@ -45,7 +47,10 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class KStreamKTableLeftJoinTest {
private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -199,4 +204,61 @@ public class KStreamKTableLeftJoinTest {
new KeyValueTimestamp<>(3, "XX3+Y3", 3));
}
+ @Test
+ public void shouldNotDropLeftNullKey() {
+ // push all four items to the table. this should not produce any item.
+ pushToTable(1, "Y");
+ processor.checkAndClearProcessResult(EMPTY);
+
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+ final TestInputTopic<Integer, String> inputTopic =
+ driver.createInputTopic(streamTopic, new IntegerSerializer(),
new StringSerializer());
+ inputTopic.pipeInput(null, "A", 0);
+
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null,
"A+null", 0));
+
+ assertTrue(appender.getMessages().isEmpty());
+ }
+
+ assertEquals(
+ 0.0,
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id", Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue()
+ );
+ }
+
+ @Test
+ public void shouldLogAndMeterWhenSkippingNullLeftValue() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+ final TestInputTopic<Integer, String> inputTopic =
+ driver.createInputTopic(streamTopic, new IntegerSerializer(),
new StringSerializer());
+ inputTopic.pipeInput(1, null);
+
+ assertTrue(appender.getMessages().contains("Skipping record due to
null join key or value. topic=[streamTopic] partition=[0] offset=[0]"));
+ }
+
+ assertEquals(
+ 1.0,
+ driver.metrics().get(
+ new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "",
+ mkMap(
+ mkEntry("thread-id", Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ ))
+ .metricValue()
+ );
+ }
}