This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b974ccd8d2c KAFKA-19096: Added checks for the number of dropped
records to foreign key join tests. (#21111)
b974ccd8d2c is described below
commit b974ccd8d2c0371246aa227952e86f6055557e4c
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Sat Feb 21 15:37:25 2026 -0800
KAFKA-19096: Added checks for the number of dropped records to foreign key
join tests. (#21111)
Follow up to https://github.com/apache/kafka/pull/20605 to improve test
coverage.
Reviewers: Matthias J. Sax <[email protected]>
---
.../ForeignTableJoinProcessorSupplierTests.java | 15 ++-
.../ResponseJoinProcessorSupplierTest.java | 12 +++
.../SubscriptionSendProcessorSupplierTest.java | 101 +++++++++++++++++++++
3 files changed, 127 insertions(+), 1 deletion(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index 722aada294d..4b53c1b29be 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -48,6 +48,7 @@ import static
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.Response
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class ForeignTableJoinProcessorSupplierTests {
@@ -113,6 +114,9 @@ public class ForeignTableJoinProcessorSupplierTests {
context.forwarded().get(1).record(),
is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash,
"new_value", null), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -125,6 +129,9 @@ public class ForeignTableJoinProcessorSupplierTests {
processor.process(record);
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -145,6 +152,9 @@ public class ForeignTableJoinProcessorSupplierTests {
context.forwarded().get(1).record(),
is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash, null,
null), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -162,6 +172,9 @@ public class ForeignTableJoinProcessorSupplierTests {
context.forwarded().get(0).record(),
is(new Record<>(pk2, new SubscriptionResponseWrapper<>(hash,
"new_value", null), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -175,7 +188,7 @@ public class ForeignTableJoinProcessorSupplierTests {
assertThat(context.forwarded(), empty());
// test dropped-records sensors
- Assertions.assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
Assertions.assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index d5052247d2e..3e6e6c74b2a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -164,6 +164,9 @@ public class ResponseJoinProcessorSupplierTest {
final List<MockProcessorContext.CapturedForward<? extends String, ?
extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1",
"(lhsValue,rhsValue)", 0)));
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -190,6 +193,9 @@ public class ResponseJoinProcessorSupplierTest {
final List<MockProcessorContext.CapturedForward<? extends String, ?
extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null,
0)));
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -216,6 +222,9 @@ public class ResponseJoinProcessorSupplierTest {
final List<MockProcessorContext.CapturedForward<? extends String, ?
extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1",
"(lhsValue,null)", 0)));
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -242,6 +251,9 @@ public class ResponseJoinProcessorSupplierTest {
final List<MockProcessorContext.CapturedForward<? extends String, ?
extends String>> forwarded = context.forwarded();
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null,
0)));
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
static Object getDroppedRecordsTotalMetric(final
InternalProcessorContext<String, ?> context) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index d49df2f5cfd..0b9ced940ab 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -83,6 +83,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -100,6 +103,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(null, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -117,6 +123,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(1).record(),
is(new Record<>(fk2, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -134,6 +143,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -151,6 +163,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0),
0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -168,6 +183,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -185,6 +203,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(null, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -200,6 +221,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(null,
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -215,6 +239,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(null, new SubscriptionWrapper<>(null,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -226,6 +253,9 @@ public class SubscriptionSendProcessorSupplierTest {
leftJoinProcessor.process(new Record<>(pk, new Change<>(null, null),
0));
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
// Inner join tests
@@ -244,6 +274,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -282,6 +315,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(1).record(),
is(new Record<>(fk2, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -295,6 +331,10 @@ public class SubscriptionSendProcessorSupplierTest {
innerJoinProcessor.process(new Record<>(pk, new
Change<>(leftRecordValue, leftRecordValue), 0));
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+ assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
}
@Test
@@ -312,6 +352,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -329,6 +372,10 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0),
0))
);
+
+ // test dropped-records sensors
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+ assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
}
@Test
@@ -346,6 +393,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -361,6 +411,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(null,
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -372,6 +425,9 @@ public class SubscriptionSendProcessorSupplierTest {
innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new
LeftValue(null)), 0));
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -383,6 +439,9 @@ public class SubscriptionSendProcessorSupplierTest {
innerJoinProcessor.process(new Record<>(pk, new Change<>(null, null),
0));
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
// Bi-function tests: inner join, left join
@@ -424,6 +483,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -441,6 +503,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(null, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -460,6 +525,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(1).record(),
is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -479,6 +547,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -498,6 +569,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0),
0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -517,6 +591,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -534,6 +611,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(null, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -551,6 +631,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null,
DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -566,6 +649,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(null, new SubscriptionWrapper<>(null,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -577,6 +663,9 @@ public class SubscriptionSendProcessorSupplierTest {
biFunctionLeftJoinProcessor.process(new Record<>(pk, new
Change<>(null, null), 0));
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
// Bi-function tests: inner join
@@ -597,6 +686,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -638,6 +730,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(1).record(),
is(new Record<>(compositeKey2, new
SubscriptionWrapper<>(hash(leftRecordValue),
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -672,6 +767,9 @@ public class SubscriptionSendProcessorSupplierTest {
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null,
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
);
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
@Test
@@ -683,6 +781,9 @@ public class SubscriptionSendProcessorSupplierTest {
biFunctionInnerJoinProcessor.process(new Record<>(pk, new
Change<>(null, null), 0));
assertThat(context.forwarded(), empty());
+
+ // test dropped-records sensors
+ assertEquals(0.0, getDroppedRecordsTotalMetric(context));
}
private static class LeftValueSerializer implements Serializer<LeftValue> {