This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4fedffd2827 KAFKA-19429: Deflake streams_smoke_test (#20019)
4fedffd2827 is described below
commit 4fedffd282768f6a198e8210a6ec8624f4401a22
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 23 20:16:52 2025 +0200
KAFKA-19429: Deflake streams_smoke_test (#20019)
It looks like we are checking for properties that are not guaranteed
under at_least_once, for example, exact counting (not allowing for
overcounting).
This change relaxes the validation constraint to only check that we
counted _at least_ N messages, and our sums come out as _at least_ the
expected sum.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/tests/SmokeTestDriver.java | 43 ++++++++++++++++------
1 file changed, 32 insertions(+), 11 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 68e6c27592c..ad3cead1763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -48,11 +48,13 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -512,21 +514,27 @@ public class SmokeTestDriver extends SmokeTestUtil {
final boolean printResults,
final boolean eosEnabled) {
final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ final BiPredicate<Number, Number> validationPredicate;
+ if (eosEnabled) {
+ validationPredicate = Objects::equals;
+ } else {
+ validationPredicate = SmokeTestDriver::lessEquals;
+ }
boolean pass;
try (final PrintStream resultStream = new
PrintStream(byteArrayOutputStream)) {
- pass = verifyTAgg(resultStream, inputs, events.get("tagg"),
printResults);
+ pass = verifyTAgg(resultStream, inputs, events.get("tagg"),
validationPredicate, printResults);
pass &= verifySuppressed(resultStream, "min-suppressed", events,
printResults);
pass &= verify(resultStream, "min-suppressed", inputs, events,
windowedKey -> {
final String unwindowedKey = windowedKey.substring(1,
windowedKey.length() - 1).replaceAll("@.*", "");
return getMin(unwindowedKey);
- }, printResults, eosEnabled);
+ }, Object::equals, printResults, eosEnabled);
pass &= verifySuppressed(resultStream, "sws-suppressed", events,
printResults);
- pass &= verify(resultStream, "min", inputs, events,
SmokeTestDriver::getMin, printResults, eosEnabled);
- pass &= verify(resultStream, "max", inputs, events,
SmokeTestDriver::getMax, printResults, eosEnabled);
- pass &= verify(resultStream, "dif", inputs, events, key ->
getMax(key).intValue() - getMin(key).intValue(), printResults, eosEnabled);
- pass &= verify(resultStream, "sum", inputs, events,
SmokeTestDriver::getSum, printResults, eosEnabled);
- pass &= verify(resultStream, "cnt", inputs, events, key1 ->
getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults,
eosEnabled);
- pass &= verify(resultStream, "avg", inputs, events,
SmokeTestDriver::getAvg, printResults, eosEnabled);
+ pass &= verify(resultStream, "min", inputs, events,
SmokeTestDriver::getMin, Object::equals, printResults, eosEnabled);
+ pass &= verify(resultStream, "max", inputs, events,
SmokeTestDriver::getMax, Object::equals, printResults, eosEnabled);
+ pass &= verify(resultStream, "dif", inputs, events, key ->
getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults,
eosEnabled);
+ pass &= verify(resultStream, "sum", inputs, events,
SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled);
+ pass &= verify(resultStream, "cnt", inputs, events, key1 ->
getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate,
printResults, eosEnabled);
+ pass &= verify(resultStream, "avg", inputs, events,
SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled);
}
return new VerificationResult(pass, new
String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
}
@@ -536,6 +544,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Map<String, Set<Integer>> inputData,
final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
final Function<String, Number>
keyToExpectation,
+ final BiPredicate<Number, Number>
validationPredicate,
final boolean printResults,
final boolean eosEnabled) {
resultStream.printf("verifying topic '%s'%n", topic);
@@ -561,12 +570,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
-
for (final Map.Entry<String, LinkedList<ConsumerRecord<String,
Number>>> entry : outputEvents.entrySet()) {
final String key = entry.getKey();
final Number expected = keyToExpectation.apply(key);
final Number actual = entry.getValue().getLast().value();
- if (!expected.equals(actual)) {
+ if (!validationPredicate.test(expected, actual)) {
resultStream.printf("%s fail: key=%s actual=%s
expected=%s%n", topic, key, actual, expected);
if (printResults) {
@@ -591,6 +599,18 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
+ private static boolean lessEquals(final Number expected, final Number
actual) {
+ if (actual instanceof Integer && expected instanceof Integer) {
+ return actual.intValue() >= expected.intValue();
+ } else if (actual instanceof Long && expected instanceof Long) {
+ return actual.longValue() >= expected.longValue();
+ } else if (actual instanceof Double && expected instanceof Double) {
+ return actual.doubleValue() >= expected.doubleValue();
+ } else {
+ throw new IllegalArgumentException("Unexpected type: " +
actual.getClass());
+ }
+ }
+
private static boolean verifySuppressed(final PrintStream resultStream,
@SuppressWarnings("SameParameterValue") final String topic,
final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
@@ -642,6 +662,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
private static boolean verifyTAgg(final PrintStream resultStream,
final Map<String, Set<Integer>> allData,
final Map<String,
LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+ final BiPredicate<Number, Number>
validationPredicate,
final boolean printResults) {
resultStream.println("verifying topic tagg");
if (taggEvents == null) {
@@ -673,7 +694,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
expectedCount = 0L;
}
- if (entry.getValue().getLast().value().longValue() !=
expectedCount) {
+ if (!validationPredicate.test(expectedCount,
entry.getValue().getLast().value())) {
resultStream.println("fail: key=" + key + " tagg=" +
entry.getValue() + " expected=" + expectedCount);
if (printResults)