This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e97540  KAFKA-7944: Improve Suppress test coverage (#6382)
8e97540 is described below

commit 8e975400711b0ea64bf4a00c8c551e448ab48416
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Tue Mar 12 11:53:29 2019 -0500

    KAFKA-7944: Improve Suppress test coverage (#6382)
    
    * add a normal windowed suppress with short windows and a short grace
    period
    * improve the smoke test so that it actually verifies the intended
    conditions
    
    See https://issues.apache.org/jira/browse/KAFKA-7944
    
    Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../SmokeTestDriverIntegrationTest.java            |   7 +-
 .../SuppressionDurabilityIntegrationTest.java      |  14 +-
 .../kafka/streams/tests/SmokeTestClient.java       |  35 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 212 ++++++++++++---------
 .../kafka/streams/tests/StreamsSmokeTest.java      |  21 +-
 tests/kafkatest/services/streams.py                |   4 +
 .../kafkatest/tests/streams/streams_bounce_test.py |  75 --------
 .../kafkatest/tests/streams/streams_smoke_test.py  | 104 +++++++---
 8 files changed, 251 insertions(+), 221 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 82f86c2..7b896ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.tests.SmokeTestClient;
 import org.apache.kafka.streams.tests.SmokeTestDriver;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.Properties;
@@ -53,7 +55,8 @@ public class SmokeTestDriverIntegrationTest {
         @Override
         public void run() {
             try {
-                final Map<String, Set<Integer>> allData = 
generate(bootstrapServers, numKeys, maxRecordsPerKey, true);
+                final Map<String, Set<Integer>> allData =
+                    generate(bootstrapServers, numKeys, maxRecordsPerKey, 
Duration.ofSeconds(20));
                 result = verify(bootstrapServers, allData, maxRecordsPerKey);
 
             } catch (final Exception ex) {
@@ -76,7 +79,7 @@ public class SmokeTestDriverIntegrationTest {
         int numClientsCreated = 0;
         final ArrayList<SmokeTestClient> clients = new ArrayList<>();
 
-        CLUSTER.createTopics(SmokeTestDriver.topics());
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 
SmokeTestDriver.topics());
 
         final String bootstrapServers = CLUSTER.bootstrapServers();
         final Driver driver = new Driver(bootstrapServers, 10, 1000);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 6f759bc..3bb1131 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -49,7 +49,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
@@ -88,13 +87,16 @@ public class SuppressionDurabilityIntegrationTest {
     private static final int COMMIT_INTERVAL = 100;
     private final boolean eosEnabled;
 
-    public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
-        this.eosEnabled = eosEnabled;
-    }
-
     @Parameters(name = "{index}: eosEnabled={0}")
     public static Collection<Object[]> parameters() {
-        return Arrays.asList(new Object[] {false}, new Object[] {true});
+        return asList(
+            new Object[] {false},
+            new Object[] {true}
+        );
+    }
+
+    public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
+        this.eosEnabled = eosEnabled;
     }
 
     private KTable<String, Long> buildCountsTable(final String input, final 
StreamsBuilder builder) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 3b76fca..b81c0a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
@@ -40,8 +40,11 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.TestUtils;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Properties;
 
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
@@ -113,7 +116,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Topology build = getTopology();
         final KafkaStreams streamsClient = new KafkaStreams(build, 
getStreamsConfig(props));
         streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s: %s -> %s%n", name, oldState, newState);
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), 
oldState, newState);
             if (oldState == KafkaStreams.State.REBALANCING && newState == 
KafkaStreams.State.RUNNING) {
                 started = true;
             }
@@ -149,24 +152,22 @@ public class SmokeTestClient extends SmokeTestUtil {
                     .withRetention(Duration.ofHours(25))
             );
 
-        minAggregation
-            .toStream()
-            .filterNot((k, v) -> k.key().equals("flush"))
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("min-raw", Produced.with(stringSerde, intSerde));
+        streamify(minAggregation, "min-raw");
 
-        minAggregation
-            
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
-            .toStream()
-            .filterNot((k, v) -> k.key().equals("flush"))
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("min-suppressed", Produced.with(stringSerde, intSerde));
+        
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), 
"min-suppressed");
 
         minAggregation
             .toStream(new Unwindow<>())
             .filterNot((k, v) -> k.equals("flush"))
             .to("min", Produced.with(stringSerde, intSerde));
 
+        final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
+            
.windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30)))
+            .reduce((l, r) -> l + r);
+
+        streamify(smallWindowSum, "sws-raw");
+        
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), 
"sws-suppressed");
+
         final KTable<String, Integer> minTable = builder.table(
             "min",
             Consumed.with(stringSerde, intSerde),
@@ -250,4 +251,12 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         return builder.build();
     }
+
+    private static void streamify(final KTable<Windowed<String>, Integer> 
windowedTable, final String topic) {
+        windowedTable
+            .toStream()
+            .filterNot((k, v) -> k.key().equals("flush"))
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to(topic, Produced.with(stringSerde, intSerde));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index b7f9d7f..98e6e8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -60,13 +60,14 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 
 public class SmokeTestDriver extends SmokeTestUtil {
-    private static final String[] TOPICS = new String[] {
+    private static final String[] TOPICS = {
         "data",
         "echo",
         "max",
         "min", "min-suppressed", "min-raw",
         "dif",
         "sum",
+        "sws-raw", "sws-suppressed",
         "cnt",
         "avg",
         "tagg"
@@ -80,18 +81,18 @@ public class SmokeTestDriver extends SmokeTestUtil {
         private int index;
 
         ValueList(final int min, final int max) {
-            this.key = min + "-" + max;
+            key = min + "-" + max;
 
-            this.values = new int[max - min + 1];
-            for (int i = 0; i < this.values.length; i++) {
-                this.values[i] = min + i;
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
             }
             // We want to randomize the order of data to test not completely 
predictable processing order
             // However, values are also use as a timestamp of the record. 
(TODO: separate data and timestamp)
             // We keep some correlation of time and order. Thus, the shuffling 
is done with a sliding window
-            shuffle(this.values, 10);
+            shuffle(values, 10);
 
-            this.index = 0;
+            index = 0;
         }
 
         int next() {
@@ -103,45 +104,25 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return Arrays.copyOf(TOPICS, TOPICS.length);
     }
 
-    public static Map<String, Set<Integer>> generate(final String kafka,
-                                                     final int numKeys,
-                                                     final int 
maxRecordsPerKey,
-                                                     final boolean 
autoTerminate) {
-        final Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-
-        final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
 
         int numRecordsProduced = 0;
 
-        final Map<String, Set<Integer>> allData = new HashMap<>();
         final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
-            allData.put(data[i].key, new HashSet<>());
-        }
-        final Random rand = new Random();
-
-        int remaining = 1; // dummy value must be positive if <autoTerminate> 
is false
-        if (autoTerminate) {
-            remaining = data.length;
         }
 
-        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
-
-        while (remaining > 0) {
-            final int index = autoTerminate ? rand.nextInt(remaining) : 
rand.nextInt(numKeys);
-            final String key = data[index].key;
-            final int value = data[index].next();
+        final Random rand = new Random();
 
-            if (autoTerminate && value < 0) {
-                remaining--;
-                data[index] = data[remaining];
-            } else {
+        try (final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
 
                 final ProducerRecord<byte[], byte[]> record =
                     new ProducerRecord<>(
@@ -150,51 +131,112 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         intSerde.serializer().serialize("", value)
                     );
 
-                producer.send(record, new TestCallback(record, needRetry));
+                producer.send(record);
 
                 numRecordsProduced++;
-                allData.get(key).add(value);
                 if (numRecordsProduced % 100 == 0) {
-                    System.out.println(numRecordsProduced + " records 
produced");
+                    System.out.println(Instant.now() + " " + 
numRecordsProduced + " records produced");
                 }
                 Utils.sleep(2);
             }
         }
-        producer.flush();
-
-        int remainingRetries = 5;
-        while (!needRetry.isEmpty()) {
-            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new 
ArrayList<>();
-            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
-                System.out.println("retry producing " + 
stringSerde.deserializer().deserialize("", record.key()));
-                producer.send(record, new TestCallback(record, needRetry2));
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int 
maxRecordsPerKey,
+                                                     final Duration 
timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / 
maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + 
numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
             }
             producer.flush();
-            needRetry = needRetry2;
 
-            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
-                System.err.println("Failed to produce all records after 
multiple retries");
-                Exit.exit(1);
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new 
ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + 
stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, 
needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after 
multiple retries");
+                    Exit.exit(1);
+                }
             }
-        }
 
-        // now that we've sent everything, we'll send some final records with 
a timestamp high enough to flush out
-        // all suppressed records.
-        final List<PartitionInfo> partitions = producer.partitionsFor("data");
-        for (final PartitionInfo partition : partitions) {
-            producer.send(new ProducerRecord<>(
-                partition.topic(),
-                partition.partition(),
-                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
-                stringSerde.serializer().serialize("", "flush"),
-                intSerde.serializer().serialize("", 0)
-            ));
+            // now that we've sent everything, we'll send some final records 
with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = 
producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
         }
-
-        producer.close();
         return Collections.unmodifiableMap(allData);
     }
 
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
     private static class TestCallback implements Callback {
         private final ProducerRecord<byte[], byte[]> originalRecord;
         private final List<ProducerRecord<byte[], byte[]>> needRetry;
@@ -232,12 +274,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     public static class NumberDeserializer implements Deserializer<Number> {
-
-        @Override
-        public void configure(final Map<String, ?> configs, final boolean 
isKey) {
-
-        }
-
         @Override
         public Number deserialize(final String topic, final byte[] data) {
             final Number value;
@@ -247,6 +283,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 case "min":
                 case "min-raw":
                 case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
                 case "max":
                 case "dif":
                     value = intSerde.deserializer().deserialize(topic, data);
@@ -264,11 +302,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
             return value;
         }
-
-        @Override
-        public void close() {
-
-        }
     }
 
     public static VerificationResult verify(final String kafka,
@@ -279,6 +312,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
 
         final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props);
         final List<TopicPartition> partitions = getAllPartitions(consumer, 
TOPICS);
@@ -406,7 +440,12 @@ public class SmokeTestDriver extends SmokeTestUtil {
         boolean pass;
         try (final PrintStream resultStream = new 
PrintStream(byteArrayOutputStream)) {
             pass = verifyTAgg(resultStream, inputs, events.get("tagg"));
-            pass &= verifySuppressed(resultStream, "min-suppressed", inputs, 
events, SmokeTestDriver::getMin);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, 
windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, 
windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            });
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events);
             pass &= verify(resultStream, "min", inputs, events, 
SmokeTestDriver::getMin);
             pass &= verify(resultStream, "max", inputs, events, 
SmokeTestDriver::getMax);
             pass &= verify(resultStream, "dif", inputs, events, key -> 
getMax(key).intValue() - getMin(key).intValue());
@@ -457,9 +496,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
     private static boolean verifySuppressed(final PrintStream resultStream,
                                             
@SuppressWarnings("SameParameterValue") final String topic,
-                                            final Map<String, Set<Integer>> 
inputs,
-                                            final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
-                                            final Function<String, Number> 
getMin) {
+                                            final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events) {
         resultStream.println("verifying suppressed " + topic);
         final Map<String, LinkedList<ConsumerRecord<String, Number>>> 
topicEvents = events.getOrDefault(topic, emptyMap());
         for (final Map.Entry<String, LinkedList<ConsumerRecord<String, 
Number>>> entry : topicEvents.entrySet()) {
@@ -476,14 +513,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 return false;
             }
         }
-        return verify(resultStream, topic, inputs, events, windowedKey -> {
-            final String unwindowedKey = windowedKey.substring(1, 
windowedKey.length() - 1).replaceAll("@.*", "");
-            return getMin.apply(unwindowedKey);
-        });
+        return true;
     }
 
     private static String indent(@SuppressWarnings("SameParameterValue") final 
String prefix,
-                                 final LinkedList<ConsumerRecord<String, 
Number>> list) {
+                                 final Iterable<ConsumerRecord<String, 
Number>> list) {
         final StringBuilder stringBuilder = new StringBuilder();
         for (final ConsumerRecord<String, Number> record : list) {
             stringBuilder.append(prefix).append(record).append('\n');
@@ -494,13 +528,13 @@ public class SmokeTestDriver extends SmokeTestUtil {
     private static Long getSum(final String key) {
         final int min = getMin(key).intValue();
         final int max = getMax(key).intValue();
-        return ((long) min + (long) max) * (max - min + 1L) / 2L;
+        return ((long) min + max) * (max - min + 1L) / 2L;
     }
 
     private static Double getAvg(final String key) {
         final int min = getMin(key).intValue();
         final int max = getMax(key).intValue();
-        return ((long) min + (long) max) / 2.0;
+        return ((long) min + max) / 2.0;
     }
 
 
@@ -554,7 +588,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     private static List<TopicPartition> getAllPartitions(final 
KafkaConsumer<?, ?> consumer, final String... topics) {
-        final ArrayList<TopicPartition> partitions = new ArrayList<>();
+        final List<TopicPartition> partitions = new ArrayList<>();
 
         for (final String topic : topics) {
             for (final PartitionInfo info : consumer.partitionsFor(topic)) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 4c2f6d2..00de266 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -20,11 +20,15 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static 
org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
 public class StreamsSmokeTest {
 
     /**
@@ -62,16 +66,23 @@ public class StreamsSmokeTest {
                 final int numKeys = 10;
                 final int maxRecordsPerKey = 500;
                 if (disableAutoTerminate) {
-                    SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, 
false);
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
                 } else {
-                    final Map<String, Set<Integer>> allData = 
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true);
+                    // slow down data production to span 30 seconds so that 
system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, 
Duration.ofSeconds(30));
                     SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
                 }
                 break;
             case "process":
-                // this starts a KafkaStreams client
-                final SmokeTestClient client = new 
SmokeTestClient(UUID.randomUUID().toString());
-                client.start(streamsProperties);
+                // this starts the stream processing app
+                new 
SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            case "process-eos":
+                // this starts the stream processing app with EOS
+                
streamsProperties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+                new 
SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
                 break;
             case "close-deadlock-test":
                 final ShutdownDeadlockTest test = new 
ShutdownDeadlockTest(kafka);
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 771b67f..b606267 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -353,6 +353,10 @@ class 
StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, 
kafka, "process")
 
+class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsSmokeTestEOSJobRunnerService, 
self).__init__(test_context, kafka, "process-eos")
+
 
 class StreamsEosTestDriverService(StreamsEosTestBaseService):
     def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py 
b/tests/kafkatest/tests/streams/streams_bounce_test.py
deleted file mode 100644
index 7ac7939..0000000
--- a/tests/kafkatest/tests/streams/streams_bounce_test.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark.resource import cluster
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
-import time
-
-
-class StreamsBounceTest(KafkaTest):
-    """
-    Simple test of Kafka Streams.
-    """
-
-    def __init__(self, test_context):
-        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, 
num_brokers=3, topics={
-            'echo' : { 'partitions': 5, 'replication-factor': 2 },
-            'data' : { 'partitions': 5, 'replication-factor': 2 },
-            'min' : { 'partitions': 5, 'replication-factor': 2 },
-            'max' : { 'partitions': 5, 'replication-factor': 2 },
-            'sum' : { 'partitions': 5, 'replication-factor': 2 },
-            'dif' : { 'partitions': 5, 'replication-factor': 2 },
-            'cnt' : { 'partitions': 5, 'replication-factor': 2 },
-            'avg' : { 'partitions': 5, 'replication-factor': 2 },
-            'wcnt' : { 'partitions': 5, 'replication-factor': 2 },
-            'tagg' : { 'partitions': 5, 'replication-factor': 2 }
-        })
-
-        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, 
self.kafka)
-
-    @cluster(num_nodes=6)
-    def test_bounce(self):
-        """
-        Start a smoke test client, then abort (kill -9) and restart it a few 
times.
-        Ensure that all records are delivered.
-        """
-
-        self.driver.start()
-
-        self.processor1.start()
-
-        time.sleep(15)
-
-        self.processor1.abortThenRestart()
-
-        time.sleep(15)
-
-        # enable this after we add change log partition replicas
-        #self.kafka.signal_leader("data")
-
-        #time.sleep(15);
-
-        self.processor1.abortThenRestart()
-
-        self.driver.wait()
-        self.driver.stop()
-
-        self.processor1.stop()
-
-        node = self.driver.node
-        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
self.driver.STDOUT_FILE, allow_fail=False)
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py 
b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 496c495..094869b 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -14,11 +14,11 @@
 # limitations under the License.
 
 
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
-import time
+from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService, StreamsSmokeTestEOSJobRunnerService
 
 
 class StreamsSmokeTest(KafkaTest):
@@ -31,8 +31,12 @@ class StreamsSmokeTest(KafkaTest):
             'echo' : { 'partitions': 5, 'replication-factor': 1 },
             'data' : { 'partitions': 5, 'replication-factor': 1 },
             'min' : { 'partitions': 5, 'replication-factor': 1 },
+            'min-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
+            'min-raw' : { 'partitions': 5, 'replication-factor': 1 },
             'max' : { 'partitions': 5, 'replication-factor': 1 },
             'sum' : { 'partitions': 5, 'replication-factor': 1 },
+            'sws-raw' : { 'partitions': 5, 'replication-factor': 1 },
+            'sws-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
             'dif' : { 'partitions': 5, 'replication-factor': 1 },
             'cnt' : { 'partitions': 5, 'replication-factor': 1 },
             'avg' : { 'partitions': 5, 'replication-factor': 1 },
@@ -40,39 +44,77 @@ class StreamsSmokeTest(KafkaTest):
             'tagg' : { 'partitions': 5, 'replication-factor': 1 }
         })
 
+        self.test_context = test_context
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, 
self.kafka)
-        self.processor2 = StreamsSmokeTestJobRunnerService(test_context, 
self.kafka)
-        self.processor3 = StreamsSmokeTestJobRunnerService(test_context, 
self.kafka)
-        self.processor4 = StreamsSmokeTestJobRunnerService(test_context, 
self.kafka)
 
-    @cluster(num_nodes=9)
-    def test_streams(self):
-        """
-        Start a few smoke test clients, then repeat start a new one, stop 
(cleanly) running one a few times.
-        Ensure that all results (stats on values computed by Kafka Streams) 
are correct.
-        """
-
-        self.driver.start()
-
-        self.processor1.start()
-        self.processor2.start()
-
-        time.sleep(15)
-
-        self.processor3.start()
-        self.processor1.stop()
-
-        time.sleep(15)
-
-        self.processor4.start()
+    @cluster(num_nodes=8)
+    @matrix(eos=[True, False], crash=[True, False])
+    def test_streams(self, eos, crash):
+        #
+        if eos:
+            processor1 = 
StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
+            processor2 = 
StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
+            processor3 = 
StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
+        else:
+            processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka)
+            processor2 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka)
+            processor3 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka)
+
+
+
+        with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as 
monitor1:
+            processor1.start()
+            monitor1.wait_until('REBALANCING -> RUNNING',
+                               timeout_sec=60,
+                               err_msg="Never saw 'REBALANCING -> RUNNING' 
message " + str(processor1.node.account)
+                               )
+
+            self.driver.start()
+
+            monitor1.wait_until('processed',
+                                timeout_sec=30,
+                                err_msg="Didn't see any processing messages " 
+ str(processor1.node.account)
+                                )
+
+            # make sure we're not already done processing (which would 
invalidate the test)
+            self.driver.node.account.ssh("! grep 'Result Verification' %s" % 
self.driver.STDOUT_FILE, allow_fail=False)
+
+            processor1.stop_nodes(not crash)
+
+        with processor2.node.account.monitor_log(processor2.STDOUT_FILE) as 
monitor2:
+            processor2.start()
+            monitor2.wait_until('REBALANCING -> RUNNING',
+                                timeout_sec=120,
+                                err_msg="Never saw 'REBALANCING -> RUNNING' 
message " + str(processor2.node.account)
+                                )
+            monitor2.wait_until('processed',
+                                timeout_sec=30,
+                                err_msg="Didn't see any processing messages " 
+ str(processor2.node.account)
+                                )
+
+        # make sure we're not already done processing (which would invalidate 
the test)
+        self.driver.node.account.ssh("! grep 'Result Verification' %s" % 
self.driver.STDOUT_FILE, allow_fail=False)
+
+        processor2.stop_nodes(not crash)
+
+        with processor3.node.account.monitor_log(processor3.STDOUT_FILE) as 
monitor3:
+            processor3.start()
+            monitor3.wait_until('REBALANCING -> RUNNING',
+                                timeout_sec=120,
+                                err_msg="Never saw 'REBALANCING -> RUNNING' 
message " + str(processor3.node.account)
+                                )
+            # there should still be some data left for this processor to work 
on.
+            monitor3.wait_until('processed',
+                                timeout_sec=30,
+                                err_msg="Didn't see any processing messages " 
+ str(processor3.node.account)
+                                )
 
         self.driver.wait()
         self.driver.stop()
 
-        self.processor2.stop()
-        self.processor3.stop()
-        self.processor4.stop()
+        processor3.stop()
 
-        node = self.driver.node
-        node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, 
allow_fail=False)
+        if crash and not eos:
+            self.driver.node.account.ssh("grep -E 
'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, 
allow_fail=False)
+        else:
+            self.driver.node.account.ssh("grep SUCCESS %s" % 
self.driver.STDOUT_FILE, allow_fail=False)

Reply via email to