This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bbf48ad6770 KAFKA-19208: KStream-GlobalKTable join should not drop 
left-null-key record (#19580)
bbf48ad6770 is described below

commit bbf48ad6770cc21acd58531c70d452d24a7c8af7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu May 15 18:34:07 2025 -0700

    KAFKA-19208: KStream-GlobalKTable join should not drop left-null-key record 
(#19580)
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../internals/KStreamKTableJoinProcessor.java      |   6 +-
 .../internals/KStreamGlobalKTableJoinTest.java     | 112 ++++++++++++++---
 .../internals/KStreamGlobalKTableLeftJoinTest.java | 140 ++++++++++++++++-----
 .../kstream/internals/KStreamKTableJoinTest.java   |  71 ++++++++---
 .../internals/KStreamKTableLeftJoinTest.java       |  62 +++++++++
 5 files changed, 324 insertions(+), 67 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 19231486c6b..648ce565379 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -126,13 +126,13 @@ class KStreamKTableJoinProcessor<StreamKey, StreamValue, 
TableKey, TableValue, V
 
     private void doJoin(final Record<StreamKey, StreamValue> record) {
         final TableKey mappedKey = keyMapper.apply(record.key(), 
record.value());
-        final TableValue value2 = getValue2(record, mappedKey);
+        final TableValue value2 = getTableValue(record, mappedKey);
         if (leftJoin || value2 != null) {
             
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), 
record.value(), value2)));
         }
     }
 
-    private TableValue getValue2(final Record<StreamKey, StreamValue> record, 
final TableKey mappedKey) {
+    private TableValue getTableValue(final Record<StreamKey, StreamValue> 
record, final TableKey mappedKey) {
         if (mappedKey == null) return null;
         final ValueAndTimestamp<TableValue> valueAndTimestamp = 
valueGetter.isVersioned()
             ? valueGetter.get(mappedKey, record.timestamp())
@@ -150,7 +150,7 @@ class KStreamKTableJoinProcessor<StreamKey, StreamValue, 
TableKey, TableValue, V
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
         final TableKey mappedKey = keyMapper.apply(record.key(), 
record.value());
-        if (leftJoin && record.key() == null && record.value() != null) {
+        if (leftJoin && mappedKey == null && record.value() != null) {
             return false;
         }
         if (mappedKey == null || record.value() == null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index cd9126f03e5..6085bdacf68 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +47,10 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class KStreamGlobalKTableJoinTest {
@@ -89,7 +94,7 @@ public class KStreamGlobalKTableJoinTest {
         }
         keyMapper = (key, value) -> {
             final String[] tokens = value.split(",");
-            // Value is comma delimited. If second token is present, it's the 
key to the global ktable.
+            // Value is comma-delimited. If second token is present, it's the 
key to the global ktable.
             // If not present, use null to indicate no match
             return tokens.length > 1 ? tokens[1] : null;
         };
@@ -169,8 +174,10 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 2),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3)
+        );
 
         // push all items to the globalTable. this should not produce any item
 
@@ -180,10 +187,12 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+YY0", 6),
-                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
-                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
-                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6),
+            new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+            new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+            new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9)
+        );
 
         // push all items to the globalTable. this should not produce any item
 
@@ -202,8 +211,10 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1)
+        );
 
     }
 
@@ -218,10 +229,12 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+            new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+            new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3)
+        );
 
         // push two items with null to the globalTable as deletes. this should 
not produce any item.
 
@@ -231,8 +244,10 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "XX", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, 
"XX2,FKey2+Y2", 6),
-                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
+            new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7)
+        );
     }
 
     @Test
@@ -248,10 +263,54 @@ public class KStreamGlobalKTableJoinTest {
 
         pushToStream(4, "XXX", false, false);
         processor.checkAndClearProcessResult(EMPTY);
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", 
Thread.currentThread().getName()),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(4.0)
+        );
+    }
+
+    @Test
+    public void shouldNotJoinOnNullKeyMapperValuesWithNullKeys() {
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream with no foreign key, 
resulting in null keyMapper values.
+        // this should not produce any item.
+
+        pushToStream(4, "XXX", false, true);
+        processor.checkAndClearProcessResult(EMPTY);
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", 
Thread.currentThread().getName()),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(4.0)
+        );
     }
 
     @Test
-    public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
+    public void shouldJoinOnNullKey() {
         // push two items to the globalTable. this should not produce any item.
 
         pushToGlobalTable(2, "Y");
@@ -260,7 +319,24 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "X", true, true);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1)
+        );
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", 
Thread.currentThread().getName()),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(0.0)
+        );
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index b0a44c36e27..6f49917a012 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +47,10 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class KStreamGlobalKTableLeftJoinTest {
@@ -89,7 +94,7 @@ public class KStreamGlobalKTableLeftJoinTest {
         }
         keyMapper = (key, value) -> {
             final String[] tokens = value.split(",");
-            // Value is comma delimited. If second token is present, it's the 
key to the global ktable.
+            // Value is comma-delimited. If second token is present, it's the 
key to the global ktable.
             // If not present, use null to indicate no match
             return tokens.length > 1 ? tokens[1] : null;
         };
@@ -150,8 +155,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push two items to the primary stream. the globalTable is empty
 
         pushToStream(2, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+null", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+null", 1)
+        );
     }
 
     @Test
@@ -160,8 +167,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push two items to the primary stream. the globalTable is empty
 
         pushToStream(2, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+null", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+null", 1)
+        );
 
         // push two items to the globalTable. this should not produce any item.
 
@@ -171,10 +180,12 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 2),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
-                new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
-                new KeyValueTimestamp<>(3, "X3,FKey3+null", 5));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
+            new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
+            new KeyValueTimestamp<>(3, "X3,FKey3+null", 5)
+        );
 
         // push all items to the globalTable. this should not produce any item
 
@@ -184,10 +195,12 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+YY0", 6),
-                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
-                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
-                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6),
+            new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+            new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+            new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9)
+        );
 
         // push all items to the globalTable. this should not produce any item
 
@@ -206,10 +219,12 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+            new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
+            new KeyValueTimestamp<>(3, "X3,FKey3+null", 3)
+        );
 
     }
 
@@ -224,10 +239,12 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+            new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+            new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3)
+        );
 
         // push two items with null to the globalTable as deletes. this should 
not produce any item.
 
@@ -236,15 +253,17 @@ public class KStreamGlobalKTableLeftJoinTest {
 
         // push all four items to the primary stream. this should produce four 
items.
 
-        pushToStream(4, "XX", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"XX0,FKey0+null", 4),
-                new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5),
-                new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
-                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
+        pushToStream(4, "X", true, false);
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+null", 4),
+            new KeyValueTimestamp<>(1, "X1,FKey1+null", 5),
+            new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 6),
+            new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 7)
+        );
     }
 
     @Test
-    public void shouldNotJoinOnNullKeyMapperValues() {
+    public void shouldJoinOnNullKeyMapperValues() {
 
         // push all items to the globalTable. this should not produce any item
 
@@ -255,11 +274,66 @@ public class KStreamGlobalKTableLeftJoinTest {
         // this should not produce any item.
 
         pushToStream(4, "XXX", false, false);
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "XXX0+null", 0),
+            new KeyValueTimestamp<>(1, "XXX1+null", 1),
+            new KeyValueTimestamp<>(2, "XXX2+null", 2),
+            new KeyValueTimestamp<>(3, "XXX3+null", 3)
+        );
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", 
Thread.currentThread().getName()),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(0.0)
+        );
+    }
+
+    @Test
+    public void shouldJoinOnNullKeyMapperValuesWithNullKeys() {
+
+        // push all items to the globalTable. this should not produce any item
+
+        pushToGlobalTable(4, "Y");
         processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream with no foreign key, 
resulting in null keyMapper values.
+        // this should not produce any item.
+
+        pushToStream(4, "XXX", false, true);
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(null, "XXX0+null", 0),
+            new KeyValueTimestamp<>(1, "XXX1+null", 1),
+            new KeyValueTimestamp<>(2, "XXX2+null", 2),
+            new KeyValueTimestamp<>(3, "XXX3+null", 3)
+        );
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", 
Thread.currentThread().getName()),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(0.0)
+        );
     }
 
     @Test
-    public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
+    public void shouldJoinOnNullKey() {
         // push four items to the globalTable. this should not produce any 
item.
 
         pushToGlobalTable(4, "Y");
@@ -268,9 +342,11 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, true);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
+            new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
+            new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3)
+        );
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index c4552715982..1419fd716c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -52,6 +53,8 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -354,8 +357,10 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two 
items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1+Y1", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1)
+        );
 
         // push all items to the table. this should not produce any item
         pushToTable(4, "YY");
@@ -363,10 +368,12 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce four 
items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+YY0", 0),
-                new KeyValueTimestamp<>(1, "X1+YY1", 1),
-                new KeyValueTimestamp<>(2, "X2+YY2", 2),
-                new KeyValueTimestamp<>(3, "X3+YY3", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1),
+            new KeyValueTimestamp<>(2, "X2+YY2", 2),
+            new KeyValueTimestamp<>(3, "X3+YY3", 3)
+        );
 
         // push all items to the table. this should not produce any item
         pushToTable(4, "YYY");
@@ -381,8 +388,10 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two 
items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1+Y1", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1)
+        );
     }
 
     @Test
@@ -393,10 +402,12 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce four 
items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1+Y1", 1),
-                new KeyValueTimestamp<>(2, "X2+Y2", 2),
-                new KeyValueTimestamp<>(3, "X3+Y3", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1),
+            new KeyValueTimestamp<>(2, "X2+Y2", 2),
+            new KeyValueTimestamp<>(3, "X3+Y3", 3)
+        );
 
         // push two items with null to the table as deletes. this should not 
produce any item.
         pushNullValueToTable();
@@ -404,8 +415,10 @@ public class KStreamKTableJoinTest {
 
         // push all four items to the primary stream. this should produce two 
items.
         pushToStream(4, "XX");
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, 
"XX2+Y2", 2),
-                new KeyValueTimestamp<>(3, "XX3+Y3", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(2, "XX2+Y2", 2),
+            new KeyValueTimestamp<>(3, "XX3+Y3", 3)
+        );
     }
 
     @Test
@@ -420,6 +433,21 @@ public class KStreamKTableJoinTest {
                 hasItem("Skipping record due to null join key or value. 
topic=[streamTopic] partition=[0] "
                     + "offset=[0]"));
         }
+
+        assertThat(
+            driver.metrics().get(
+                new MetricName(
+                    "dropped-records-total",
+                    "stream-task-metrics",
+                    "",
+                    mkMap(
+                        mkEntry("thread-id", Thread.currentThread().getName()),
+                        mkEntry("task-id", "0_0")
+                    )
+                ))
+                .metricValue(),
+            is(1.0)
+        );
     }
 
     @Test
@@ -435,6 +463,21 @@ public class KStreamKTableJoinTest {
                     + "offset=[0]")
             );
         }
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", 
Thread.currentThread().getName()),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(1.0)
+        );
     }
 
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 42ff6aa8d92..25eafd3043b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TestInputTopic;
@@ -45,7 +47,10 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KStreamKTableLeftJoinTest {
     private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -199,4 +204,61 @@ public class KStreamKTableLeftJoinTest {
                 new KeyValueTimestamp<>(3, "XX3+Y3", 3));
     }
 
+    @Test
+    public void shouldNotDropLeftNullKey() {
+        // push all four items to the table. this should not produce any item.
+        pushToTable(1, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+            final TestInputTopic<Integer, String> inputTopic =
+                driver.createInputTopic(streamTopic, new IntegerSerializer(), 
new StringSerializer());
+            inputTopic.pipeInput(null, "A", 0);
+
+            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, 
"A+null", 0));
+
+            assertTrue(appender.getMessages().isEmpty());
+        }
+
+        assertEquals(
+            0.0,
+            driver.metrics().get(
+                new MetricName(
+                    "dropped-records-total",
+                    "stream-task-metrics",
+                    "",
+                    mkMap(
+                        mkEntry("thread-id", Thread.currentThread().getName()),
+                        mkEntry("task-id", "0_0")
+                    )
+                ))
+                .metricValue()
+        );
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullLeftValue() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+            final TestInputTopic<Integer, String> inputTopic =
+                driver.createInputTopic(streamTopic, new IntegerSerializer(), 
new StringSerializer());
+            inputTopic.pipeInput(1, null);
+
+            assertTrue(appender.getMessages().contains("Skipping record due to 
null join key or value. topic=[streamTopic] partition=[0] offset=[0]"));
+        }
+
+        assertEquals(
+            1.0,
+            driver.metrics().get(
+                new MetricName(
+                    "dropped-records-total",
+                    "stream-task-metrics",
+                    "",
+                    mkMap(
+                        mkEntry("thread-id", Thread.currentThread().getName()),
+                        mkEntry("task-id", "0_0")
+                    )
+                ))
+                .metricValue()
+        );
+    }
 }

Reply via email to