This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b974ccd8d2c KAFKA-19096: Added checks for the number of dropped 
records to foreign key join tests. (#21111)
b974ccd8d2c is described below

commit b974ccd8d2c0371246aa227952e86f6055557e4c
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Sat Feb 21 15:37:25 2026 -0800

    KAFKA-19096: Added checks for the number of dropped records to foreign key 
join tests. (#21111)
    
    Follow up to https://github.com/apache/kafka/pull/20605 to improve test
    coverage.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../ForeignTableJoinProcessorSupplierTests.java    |  15 ++-
 .../ResponseJoinProcessorSupplierTest.java         |  12 +++
 .../SubscriptionSendProcessorSupplierTest.java     | 101 +++++++++++++++++++++
 3 files changed, 127 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index 722aada294d..4b53c1b29be 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -48,6 +48,7 @@ import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.Response
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ForeignTableJoinProcessorSupplierTests {
 
@@ -113,6 +114,9 @@ public class ForeignTableJoinProcessorSupplierTests {
             context.forwarded().get(1).record(),
             is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, 
"new_value", null), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -125,6 +129,9 @@ public class ForeignTableJoinProcessorSupplierTests {
         processor.process(record);
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -145,6 +152,9 @@ public class ForeignTableJoinProcessorSupplierTests {
             context.forwarded().get(1).record(),
             is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, null, 
null), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -162,6 +172,9 @@ public class ForeignTableJoinProcessorSupplierTests {
             context.forwarded().get(0).record(),
             is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, 
"new_value", null), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -175,7 +188,7 @@ public class ForeignTableJoinProcessorSupplierTests {
         assertThat(context.forwarded(), empty());
 
         // test dropped-records sensors
-        Assertions.assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
         Assertions.assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index d5052247d2e..3e6e6c74b2a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -164,6 +164,9 @@ public class ResponseJoinProcessorSupplierTest {
         final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", 
"(lhsValue,rhsValue)", 0)));
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -190,6 +193,9 @@ public class ResponseJoinProcessorSupplierTest {
         final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 
0)));
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -216,6 +222,9 @@ public class ResponseJoinProcessorSupplierTest {
         final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", 
"(lhsValue,null)", 0)));
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -242,6 +251,9 @@ public class ResponseJoinProcessorSupplierTest {
         final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 
0)));
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     static Object getDroppedRecordsTotalMetric(final 
InternalProcessorContext<String, ?> context) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index d49df2f5cfd..0b9ced940ab 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -83,6 +83,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -100,6 +103,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(null, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -117,6 +123,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(1).record(),
             is(new Record<>(fk2, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -134,6 +143,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -151,6 +163,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -168,6 +183,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -185,6 +203,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(null, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -200,6 +221,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -215,6 +239,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(null, new SubscriptionWrapper<>(null, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -226,6 +253,9 @@ public class SubscriptionSendProcessorSupplierTest {
         leftJoinProcessor.process(new Record<>(pk, new Change<>(null, null), 
0));
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     // Inner join tests
@@ -244,6 +274,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -282,6 +315,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(1).record(),
             is(new Record<>(fk2, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -295,6 +331,10 @@ public class SubscriptionSendProcessorSupplierTest {
         innerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, leftRecordValue), 0));
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
     }
 
     @Test
@@ -312,6 +352,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -329,6 +372,10 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
         );
+
+        // test dropped-records sensors
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
     }
 
     @Test
@@ -346,6 +393,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -361,6 +411,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(fk1, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -372,6 +425,9 @@ public class SubscriptionSendProcessorSupplierTest {
         innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new 
LeftValue(null)), 0));
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -383,6 +439,9 @@ public class SubscriptionSendProcessorSupplierTest {
         innerJoinProcessor.process(new Record<>(pk, new Change<>(null, null), 
0));
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     // Bi-function tests: inner join, left join
@@ -424,6 +483,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -441,6 +503,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(null, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -460,6 +525,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(1).record(),
             is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -479,6 +547,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -498,6 +569,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -517,6 +591,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -534,6 +611,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(null, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -551,6 +631,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -566,6 +649,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(null, new SubscriptionWrapper<>(null, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -577,6 +663,9 @@ public class SubscriptionSendProcessorSupplierTest {
         biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(null, null), 0));
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     // Bi-function tests: inner join
@@ -597,6 +686,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -638,6 +730,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(1).record(),
             is(new Record<>(compositeKey2, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -672,6 +767,9 @@ public class SubscriptionSendProcessorSupplierTest {
             context.forwarded().get(0).record(),
             is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
         );
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     @Test
@@ -683,6 +781,9 @@ public class SubscriptionSendProcessorSupplierTest {
         biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(null, null), 0));
 
         assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(0.0, getDroppedRecordsTotalMetric(context));
     }
 
     private static class LeftValueSerializer implements Serializer<LeftValue> {

Reply via email to