This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4fedffd2827 KAFKA-19429: Deflake streams_smoke_test (#20019)
4fedffd2827 is described below

commit 4fedffd282768f6a198e8210a6ec8624f4401a22
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 23 20:16:52 2025 +0200

    KAFKA-19429: Deflake streams_smoke_test (#20019)
    
    It looks like we are checking for properties that are not guaranteed
    under at_least_once, for example, exact counting (not allowing for
    overcounting).
    
    This change relaxes the validation constraint to only check that we
    counted _at least_ N messages, and our sums come out as _at least_ the
    expected sum.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/tests/SmokeTestDriver.java       | 43 ++++++++++++++++------
 1 file changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 68e6c27592c..ad3cead1763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -48,11 +48,13 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -512,21 +514,27 @@ public class SmokeTestDriver extends SmokeTestUtil {
                                                 final boolean printResults,
                                                 final boolean eosEnabled) {
         final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        final BiPredicate<Number, Number> validationPredicate;
+        if (eosEnabled) {
+            validationPredicate = Objects::equals;
+        } else {
+            validationPredicate = SmokeTestDriver::lessEquals;
+        }
         boolean pass;
         try (final PrintStream resultStream = new 
PrintStream(byteArrayOutputStream)) {
-            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), 
printResults);
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), 
validationPredicate, printResults);
             pass &= verifySuppressed(resultStream, "min-suppressed", events, 
printResults);
             pass &= verify(resultStream, "min-suppressed", inputs, events, 
windowedKey -> {
                 final String unwindowedKey = windowedKey.substring(1, 
windowedKey.length() - 1).replaceAll("@.*", "");
                 return getMin(unwindowedKey);
-            }, printResults, eosEnabled);
+            }, Object::equals, printResults, eosEnabled);
             pass &= verifySuppressed(resultStream, "sws-suppressed", events, 
printResults);
-            pass &= verify(resultStream, "min", inputs, events, 
SmokeTestDriver::getMin, printResults, eosEnabled);
-            pass &= verify(resultStream, "max", inputs, events, 
SmokeTestDriver::getMax, printResults, eosEnabled);
-            pass &= verify(resultStream, "dif", inputs, events, key -> 
getMax(key).intValue() - getMin(key).intValue(), printResults, eosEnabled);
-            pass &= verify(resultStream, "sum", inputs, events, 
SmokeTestDriver::getSum, printResults, eosEnabled);
-            pass &= verify(resultStream, "cnt", inputs, events, key1 -> 
getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults, 
eosEnabled);
-            pass &= verify(resultStream, "avg", inputs, events, 
SmokeTestDriver::getAvg, printResults, eosEnabled);
+            pass &= verify(resultStream, "min", inputs, events, 
SmokeTestDriver::getMin, Object::equals, printResults, eosEnabled);
+            pass &= verify(resultStream, "max", inputs, events, 
SmokeTestDriver::getMax, Object::equals, printResults, eosEnabled);
+            pass &= verify(resultStream, "dif", inputs, events, key -> 
getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults, 
eosEnabled);
+            pass &= verify(resultStream, "sum", inputs, events, 
SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> 
getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate, 
printResults, eosEnabled);
+            pass &= verify(resultStream, "avg", inputs, events, 
SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled);
         }
         return new VerificationResult(pass, new 
String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
     }
@@ -536,6 +544,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
                                   final Map<String, Set<Integer>> inputData,
                                   final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
                                   final Function<String, Number> 
keyToExpectation,
+                                  final BiPredicate<Number, Number> 
validationPredicate,
                                   final boolean printResults,
                                   final boolean eosEnabled) {
         resultStream.printf("verifying topic '%s'%n", topic);
@@ -561,12 +570,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 }
             }
 
-
             for (final Map.Entry<String, LinkedList<ConsumerRecord<String, 
Number>>> entry : outputEvents.entrySet()) {
                 final String key = entry.getKey();
                 final Number expected = keyToExpectation.apply(key);
                 final Number actual = entry.getValue().getLast().value();
-                if (!expected.equals(actual)) {
+                if (!validationPredicate.test(expected, actual)) {
                     resultStream.printf("%s fail: key=%s actual=%s 
expected=%s%n", topic, key, actual, expected);
 
                     if (printResults) {
@@ -591,6 +599,18 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
     }
 
+    private static boolean lessEquals(final Number expected, final Number 
actual) {
+        if (actual instanceof Integer && expected instanceof Integer) {
+            return actual.intValue() >= expected.intValue();
+        } else if (actual instanceof Long && expected instanceof Long) {
+            return actual.longValue() >= expected.longValue();
+        } else if (actual instanceof Double && expected instanceof Double) {
+            return actual.doubleValue() >= expected.doubleValue();
+        } else {
+            throw new IllegalArgumentException("Unexpected type: " + 
actual.getClass());
+        }
+    }
+
     private static boolean verifySuppressed(final PrintStream resultStream,
                                             
@SuppressWarnings("SameParameterValue") final String topic,
                                             final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
@@ -642,6 +662,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
     private static boolean verifyTAgg(final PrintStream resultStream,
                                       final Map<String, Set<Integer>> allData,
                                       final Map<String, 
LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final BiPredicate<Number, Number> 
validationPredicate,
                                       final boolean printResults) {
         resultStream.println("verifying topic tagg");
         if (taggEvents == null) {
@@ -673,7 +694,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     expectedCount = 0L;
                 }
 
-                if (entry.getValue().getLast().value().longValue() != 
expectedCount) {
+                if (!validationPredicate.test(expectedCount, 
entry.getValue().getLast().value())) {
                     resultStream.println("fail: key=" + key + " tagg=" + 
entry.getValue() + " expected=" + expectedCount);
 
                     if (printResults)

Reply via email to