This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 500bd70a292 KAFKA-19429: Deflake streams_smoke_test, again (#20070)
500bd70a292 is described below
commit 500bd70a29248ef72725c5d8c7f9da0e2198cb55
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Jul 1 21:48:07 2025 +0200
KAFKA-19429: Deflake streams_smoke_test, again (#20070)
It looks like we are checking for properties that are not guaranteed
under at_least_once, for example, exact counting (not allowing for
overcounting).
This change relaxes the validation constraint:
The TAGG topic contains effectively count-by-count results. So for
example, if we have the input without duplication
0 -> 1,2,3 we will get in TAGG 3 -> 1, since 1 key had 3 values.
with duplication:
0 -> 1,1,2,3 we will get in TAGG 4 -> 1, since 1 key had 4 values.
This makes the result difficult to compare. Since we run the smoke test
also with Exactly_Once, I propose to disable validation off TAGG under
ALOS.
Similarly, the topic AVG may overcount or undercount. The test case is
extremely similar to DIF, both performing a join and two streams, the
only difference being the mathematical operation performed, so we can
also disable this validation under ALOS with minimal loss of coverage.
Finally, the change fixes a bug that would throw a NPE when validation
of a windowed stream would fail.
Reviewers: Kirk True <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../org/apache/kafka/streams/tests/SmokeTestDriver.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ad3cead1763..59698607912 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -522,7 +522,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
boolean pass;
try (final PrintStream resultStream = new
PrintStream(byteArrayOutputStream)) {
- pass = verifyTAgg(resultStream, inputs, events.get("tagg"),
validationPredicate, printResults);
+ pass = true;
+ if (eosEnabled) {
+ // TAGG is computing "Count-by-count", which may produce keys
that are not in the input data in ALOS, so we skip validation in this case.
+ pass = verifyTAgg(resultStream, inputs, events.get("tagg"),
printResults);
+ }
pass &= verifySuppressed(resultStream, "min-suppressed", events,
printResults);
pass &= verify(resultStream, "min-suppressed", inputs, events,
windowedKey -> {
final String unwindowedKey = windowedKey.substring(1,
windowedKey.length() - 1).replaceAll("@.*", "");
@@ -534,7 +538,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
pass &= verify(resultStream, "dif", inputs, events, key ->
getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults,
eosEnabled);
pass &= verify(resultStream, "sum", inputs, events,
SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled);
pass &= verify(resultStream, "cnt", inputs, events, key1 ->
getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate,
printResults, eosEnabled);
- pass &= verify(resultStream, "avg", inputs, events,
SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled);
+ if (eosEnabled) {
+ // Average can overcount and undercount in ALOS, so we skip
validation in that case.
+ pass &= verify(resultStream, "avg", inputs, events,
SmokeTestDriver::getAvg, Object::equals, printResults, eosEnabled);
+ }
}
return new VerificationResult(pass, new
String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
}
@@ -580,7 +587,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
if (printResults) {
resultStream.printf("\t inputEvents=%n%s%n\t" +
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
- indent("\t\t", observedInputEvents.get(key)),
+ indent("\t\t",
observedInputEvents.getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("echo",
emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("max",
emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("min",
emptyMap()).getOrDefault(key, new LinkedList<>())),
@@ -662,7 +669,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
private static boolean verifyTAgg(final PrintStream resultStream,
final Map<String, Set<Integer>> allData,
final Map<String,
LinkedList<ConsumerRecord<String, Number>>> taggEvents,
- final BiPredicate<Number, Number>
validationPredicate,
final boolean printResults) {
resultStream.println("verifying topic tagg");
if (taggEvents == null) {
@@ -694,7 +700,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
expectedCount = 0L;
}
- if (!validationPredicate.test(expectedCount,
entry.getValue().getLast().value())) {
+ if (entry.getValue().getLast().value().longValue() !=
expectedCount) {
resultStream.println("fail: key=" + key + " tagg=" +
entry.getValue() + " expected=" + expectedCount);
if (printResults)