This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8e97540 KAFKA-7944: Improve Suppress test coverage (#6382) 8e97540 is described below commit 8e975400711b0ea64bf4a00c8c551e448ab48416 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Tue Mar 12 11:53:29 2019 -0500 KAFKA-7944: Improve Suppress test coverage (#6382) * add a normal windowed suppress with short windows and a short grace period * improve the smoke test so that it actually verifies the intended conditions See https://issues.apache.org/jira/browse/KAFKA-7944 Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../SmokeTestDriverIntegrationTest.java | 7 +- .../SuppressionDurabilityIntegrationTest.java | 14 +- .../kafka/streams/tests/SmokeTestClient.java | 35 ++-- .../kafka/streams/tests/SmokeTestDriver.java | 212 ++++++++++++--------- .../kafka/streams/tests/StreamsSmokeTest.java | 21 +- tests/kafkatest/services/streams.py | 4 + .../kafkatest/tests/streams/streams_bounce_test.py | 75 -------- .../kafkatest/tests/streams/streams_smoke_test.py | 104 +++++++--- 8 files changed, 251 insertions(+), 221 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 82f86c2..7b896ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -18,12 +18,14 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.tests.SmokeTestClient; import org.apache.kafka.streams.tests.SmokeTestDriver; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Map; import java.util.Properties; @@ -53,7 +55,8 @@ public class SmokeTestDriverIntegrationTest { @Override public void run() { try { - final Map<String, Set<Integer>> allData = generate(bootstrapServers, numKeys, maxRecordsPerKey, true); + final Map<String, Set<Integer>> allData = + generate(bootstrapServers, numKeys, maxRecordsPerKey, Duration.ofSeconds(20)); result = verify(bootstrapServers, allData, maxRecordsPerKey); } catch (final Exception ex) { @@ -76,7 +79,7 @@ public class SmokeTestDriverIntegrationTest { int numClientsCreated = 0; final ArrayList<SmokeTestClient> clients = new ArrayList<>(); - CLUSTER.createTopics(SmokeTestDriver.topics()); + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics()); final String bootstrapServers = CLUSTER.bootstrapServers(); final Driver driver = new Driver(bootstrapServers, 10, 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 6f759bc..3bb1131 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -49,7 +49,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Locale; @@ -88,13 +87,16 @@ public class SuppressionDurabilityIntegrationTest { private static final int COMMIT_INTERVAL = 100; private final boolean eosEnabled; - public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) { - this.eosEnabled = eosEnabled; - } - @Parameters(name = "{index}: eosEnabled={0}") public static Collection<Object[]> parameters() { - return Arrays.asList(new Object[] {false}, new Object[] {true}); + return asList( + new Object[] {false}, + new Object[] {true} + ); + } + + public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) { + this.eosEnabled = eosEnabled; } private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 3b76fca..b81c0a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -32,7 +32,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; @@ -40,8 +40,11 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.TestUtils; import java.time.Duration; +import java.time.Instant; import java.util.Properties; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + public class SmokeTestClient extends SmokeTestUtil { private final String name; @@ -113,7 +116,7 @@ public class SmokeTestClient extends SmokeTestUtil { final Topology build = getTopology(); final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props)); streamsClient.setStateListener((newState, oldState) -> { - System.out.printf("%s: %s -> %s%n", name, oldState, newState); + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { started = true; } @@ -149,24 +152,22 @@ public class SmokeTestClient extends SmokeTestUtil { .withRetention(Duration.ofHours(25)) ); - minAggregation - .toStream() - .filterNot((k, v) -> k.key().equals("flush")) - .map((key, value) -> new KeyValue<>(key.toString(), value)) - .to("min-raw", Produced.with(stringSerde, intSerde)); + streamify(minAggregation, "min-raw"); - minAggregation - .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) - .toStream() - .filterNot((k, v) -> k.key().equals("flush")) - .map((key, value) -> new KeyValue<>(key.toString(), value)) - .to("min-suppressed", Produced.with(stringSerde, intSerde)); + streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed"); minAggregation .toStream(new Unwindow<>()) .filterNot((k, v) -> k.equals("flush")) .to("min", Produced.with(stringSerde, intSerde)); + final KTable<Windowed<String>, Integer> smallWindowSum = groupedData + .windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30))) + .reduce((l, r) -> l + r); + + streamify(smallWindowSum, "sws-raw"); + streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed"); + final KTable<String, Integer> minTable = builder.table( "min", Consumed.with(stringSerde, intSerde), @@ -250,4 +251,12 @@ public class SmokeTestClient extends SmokeTestUtil { return builder.build(); } + + private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) { + windowedTable + .toStream() + .filterNot((k, v) -> k.key().equals("flush")) + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to(topic, Produced.with(stringSerde, intSerde)); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index b7f9d7f..98e6e8f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -60,13 +60,14 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = new String[] { + private static final String[] TOPICS = { "data", "echo", "max", "min", "min-suppressed", "min-raw", "dif", "sum", + "sws-raw", "sws-suppressed", "cnt", "avg", "tagg" @@ -80,18 +81,18 @@ public class SmokeTestDriver extends SmokeTestUtil { private int index; ValueList(final int min, final int max) { - this.key = min + "-" + max; + key = min + "-" + max; - this.values = new int[max - min + 1]; - for (int i = 0; i < this.values.length; i++) { - this.values[i] = min + i; + values = new int[max - min + 1]; + for (int i = 0; i < values.length; i++) { + values[i] = min + i; } // We want to randomize the order of data to test not completely predictable processing order // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp) // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window - shuffle(this.values, 10); + shuffle(values, 10); - this.index = 0; + index = 0; } int next() { @@ -103,45 +104,25 @@ public class SmokeTestDriver extends SmokeTestUtil { return Arrays.copyOf(TOPICS, TOPICS.length); } - public static Map<String, Set<Integer>> generate(final String kafka, - final int numKeys, - final int maxRecordsPerKey, - final boolean autoTerminate) { - final Properties producerProps = new Properties(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - - final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + static void generatePerpetually(final String kafka, + final int numKeys, + final int maxRecordsPerKey) { + final Properties producerProps = generatorProperties(kafka); int numRecordsProduced = 0; - final Map<String, Set<Integer>> allData = new HashMap<>(); final ValueList[] data = new ValueList[numKeys]; for (int i = 0; i < numKeys; i++) { data[i] = new ValueList(i, i + maxRecordsPerKey - 1); - allData.put(data[i].key, new HashSet<>()); - } - final Random rand = new Random(); - - int remaining = 1; // dummy value must be positive if <autoTerminate> is false - if (autoTerminate) { - remaining = data.length; } - List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); - - while (remaining > 0) { - final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys); - final String key = data[index].key; - final int value = data[index].next(); + final Random rand = new Random(); - if (autoTerminate && value < 0) { - remaining--; - data[index] = data[remaining]; - } else { + try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { + while (true) { + final int index = rand.nextInt(numKeys); + final String key = data[index].key; + final int value = data[index].next(); final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( @@ -150,51 +131,112 @@ public class SmokeTestDriver extends SmokeTestUtil { intSerde.serializer().serialize("", value) ); - producer.send(record, new TestCallback(record, needRetry)); + producer.send(record); numRecordsProduced++; - allData.get(key).add(value); if (numRecordsProduced % 100 == 0) { - System.out.println(numRecordsProduced + " records produced"); + System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); } Utils.sleep(2); } } - producer.flush(); - - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>(); - for (final ProducerRecord<byte[], byte[]> record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); + } + + public static Map<String, Set<Integer>> generate(final String kafka, + final int numKeys, + final int maxRecordsPerKey, + final Duration timeToSpend) { + final Properties producerProps = generatorProperties(kafka); + + + int numRecordsProduced = 0; + + final Map<String, Set<Integer>> allData = new HashMap<>(); + final ValueList[] data = new ValueList[numKeys]; + for (int i = 0; i < numKeys; i++) { + data[i] = new ValueList(i, i + maxRecordsPerKey - 1); + allData.put(data[i].key, new HashSet<>()); + } + final Random rand = new Random(); + + int remaining = data.length; + + final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; + + List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); + + try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { + while (remaining > 0) { + final int index = rand.nextInt(remaining); + final String key = data[index].key; + final int value = data[index].next(); + + if (value < 0) { + remaining--; + data[index] = data[remaining]; + } else { + + final ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>( + "data", + stringSerde.serializer().serialize("", key), + intSerde.serializer().serialize("", value) + ); + + producer.send(record, new TestCallback(record, needRetry)); + + numRecordsProduced++; + allData.get(key).add(value); + if (numRecordsProduced % 100 == 0) { + System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); + } + Utils.sleep(Math.max(recordPauseTime, 2)); + } } producer.flush(); - needRetry = needRetry2; - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>(); + for (final ProducerRecord<byte[], byte[]> record : needRetry) { + System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } } - } - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List<PartitionInfo> partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List<PartitionInfo> partitions = producer.partitionsFor("data"); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + )); + } } - - producer.close(); return Collections.unmodifiableMap(allData); } + private static Properties generatorProperties(final String kafka) { + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + return producerProps; + } + private static class TestCallback implements Callback { private final ProducerRecord<byte[], byte[]> originalRecord; private final List<ProducerRecord<byte[], byte[]>> needRetry; @@ -232,12 +274,6 @@ public class SmokeTestDriver extends SmokeTestUtil { } public static class NumberDeserializer implements Deserializer<Number> { - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - - } - @Override public Number deserialize(final String topic, final byte[] data) { final Number value; @@ -247,6 +283,8 @@ public class SmokeTestDriver extends SmokeTestUtil { case "min": case "min-raw": case "min-suppressed": + case "sws-raw": + case "sws-suppressed": case "max": case "dif": value = intSerde.deserializer().deserialize(topic, data); @@ -264,11 +302,6 @@ public class SmokeTestDriver extends SmokeTestUtil { } return value; } - - @Override - public void close() { - - } } public static VerificationResult verify(final String kafka, @@ -279,6 +312,7 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); @@ -406,7 +440,12 @@ public class SmokeTestDriver extends SmokeTestUtil { boolean pass; try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { pass = verifyTAgg(resultStream, inputs, events.get("tagg")); - pass &= verifySuppressed(resultStream, "min-suppressed", inputs, events, SmokeTestDriver::getMin); + pass &= verifySuppressed(resultStream, "min-suppressed", events); + pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> { + final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); + return getMin(unwindowedKey); + }); + pass &= verifySuppressed(resultStream, "sws-suppressed", events); pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin); pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); @@ -457,9 +496,7 @@ public class SmokeTestDriver extends SmokeTestUtil { private static boolean verifySuppressed(final PrintStream resultStream, @SuppressWarnings("SameParameterValue") final String topic, - final Map<String, Set<Integer>> inputs, - final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, - final Function<String, Number> getMin) { + final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events) { resultStream.println("verifying suppressed " + topic); final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap()); for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) { @@ -476,14 +513,11 @@ public class SmokeTestDriver extends SmokeTestUtil { return false; } } - return verify(resultStream, topic, inputs, events, windowedKey -> { - final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); - return getMin.apply(unwindowedKey); - }); + return true; } private static String indent(@SuppressWarnings("SameParameterValue") final String prefix, - final LinkedList<ConsumerRecord<String, Number>> list) { + final Iterable<ConsumerRecord<String, Number>> list) { final StringBuilder stringBuilder = new StringBuilder(); for (final ConsumerRecord<String, Number> record : list) { stringBuilder.append(prefix).append(record).append('\n'); @@ -494,13 +528,13 @@ public class SmokeTestDriver extends SmokeTestUtil { private static Long getSum(final String key) { final int min = getMin(key).intValue(); final int max = getMax(key).intValue(); - return ((long) min + (long) max) * (max - min + 1L) / 2L; + return ((long) min + max) * (max - min + 1L) / 2L; } private static Double getAvg(final String key) { final int min = getMin(key).intValue(); final int max = getMax(key).intValue(); - return ((long) min + (long) max) / 2.0; + return ((long) min + max) / 2.0; } @@ -554,7 +588,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) { - final ArrayList<TopicPartition> partitions = new ArrayList<>(); + final List<TopicPartition> partitions = new ArrayList<>(); for (final String topic : topics) { for (final PartitionInfo info : consumer.partitionsFor(topic)) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index 4c2f6d2..00de266 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -20,11 +20,15 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import java.io.IOException; +import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + public class StreamsSmokeTest { /** @@ -62,16 +66,23 @@ public class StreamsSmokeTest { final int numKeys = 10; final int maxRecordsPerKey = 500; if (disableAutoTerminate) { - SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false); + generatePerpetually(kafka, numKeys, maxRecordsPerKey); } else { - final Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true); + // slow down data production to span 30 seconds so that system tests have time to + // do their bounces, etc. + final Map<String, Set<Integer>> allData = + generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30)); SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); } break; case "process": - // this starts a KafkaStreams client - final SmokeTestClient client = new SmokeTestClient(UUID.randomUUID().toString()); - client.start(streamsProperties); + // this starts the stream processing app + new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties); + break; + case "process-eos": + // this starts the stream processing app with EOS + streamsProperties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties); break; case "close-deadlock-test": final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 771b67f..b606267 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -353,6 +353,10 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka): super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process") +class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService): + def __init__(self, test_context, kafka): + super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos") + class StreamsEosTestDriverService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py deleted file mode 100644 index 7ac7939..0000000 --- a/tests/kafkatest/tests/streams/streams_bounce_test.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark.resource import cluster - -from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService -import time - - -class StreamsBounceTest(KafkaTest): - """ - Simple test of Kafka Streams. - """ - - def __init__(self, test_context): - super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ - 'echo' : { 'partitions': 5, 'replication-factor': 2 }, - 'data' : { 'partitions': 5, 'replication-factor': 2 }, - 'min' : { 'partitions': 5, 'replication-factor': 2 }, - 'max' : { 'partitions': 5, 'replication-factor': 2 }, - 'sum' : { 'partitions': 5, 'replication-factor': 2 }, - 'dif' : { 'partitions': 5, 'replication-factor': 2 }, - 'cnt' : { 'partitions': 5, 'replication-factor': 2 }, - 'avg' : { 'partitions': 5, 'replication-factor': 2 }, - 'wcnt' : { 'partitions': 5, 'replication-factor': 2 }, - 'tagg' : { 'partitions': 5, 'replication-factor': 2 } - }) - - self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) - self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - - @cluster(num_nodes=6) - def test_bounce(self): - """ - Start a smoke test client, then abort (kill -9) and restart it a few times. - Ensure that all records are delivered. - """ - - self.driver.start() - - self.processor1.start() - - time.sleep(15) - - self.processor1.abortThenRestart() - - time.sleep(15) - - # enable this after we add change log partition replicas - #self.kafka.signal_leader("data") - - #time.sleep(15); - - self.processor1.abortThenRestart() - - self.driver.wait() - self.driver.stop() - - self.processor1.stop() - - node = self.driver.node - node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index 496c495..094869b 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -14,11 +14,11 @@ # limitations under the License. +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService -import time +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsSmokeTestEOSJobRunnerService class StreamsSmokeTest(KafkaTest): @@ -31,8 +31,12 @@ class StreamsSmokeTest(KafkaTest): 'echo' : { 'partitions': 5, 'replication-factor': 1 }, 'data' : { 'partitions': 5, 'replication-factor': 1 }, 'min' : { 'partitions': 5, 'replication-factor': 1 }, + 'min-suppressed' : { 'partitions': 5, 'replication-factor': 1 }, + 'min-raw' : { 'partitions': 5, 'replication-factor': 1 }, 'max' : { 'partitions': 5, 'replication-factor': 1 }, 'sum' : { 'partitions': 5, 'replication-factor': 1 }, + 'sws-raw' : { 'partitions': 5, 'replication-factor': 1 }, + 'sws-suppressed' : { 'partitions': 5, 'replication-factor': 1 }, 'dif' : { 'partitions': 5, 'replication-factor': 1 }, 'cnt' : { 'partitions': 5, 'replication-factor': 1 }, 'avg' : { 'partitions': 5, 'replication-factor': 1 }, @@ -40,39 +44,77 @@ class StreamsSmokeTest(KafkaTest): 'tagg' : { 'partitions': 5, 'replication-factor': 1 } }) + self.test_context = test_context self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) - self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - @cluster(num_nodes=9) - def test_streams(self): - """ - Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times. - Ensure that all results (stats on values computed by Kafka Streams) are correct. - """ - - self.driver.start() - - self.processor1.start() - self.processor2.start() - - time.sleep(15) - - self.processor3.start() - self.processor1.stop() - - time.sleep(15) - - self.processor4.start() + @cluster(num_nodes=8) + @matrix(eos=[True, False], crash=[True, False]) + def test_streams(self, eos, crash): + # + if eos: + processor1 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka) + processor2 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka) + processor3 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka) + else: + processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + + + + with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as monitor1: + processor1.start() + monitor1.wait_until('REBALANCING -> RUNNING', + timeout_sec=60, + err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor1.node.account) + ) + + self.driver.start() + + monitor1.wait_until('processed', + timeout_sec=30, + err_msg="Didn't see any processing messages " + str(processor1.node.account) + ) + + # make sure we're not already done processing (which would invalidate the test) + self.driver.node.account.ssh("! grep 'Result Verification' %s" % self.driver.STDOUT_FILE, allow_fail=False) + + processor1.stop_nodes(not crash) + + with processor2.node.account.monitor_log(processor2.STDOUT_FILE) as monitor2: + processor2.start() + monitor2.wait_until('REBALANCING -> RUNNING', + timeout_sec=120, + err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor2.node.account) + ) + monitor2.wait_until('processed', + timeout_sec=30, + err_msg="Didn't see any processing messages " + str(processor2.node.account) + ) + + # make sure we're not already done processing (which would invalidate the test) + self.driver.node.account.ssh("! grep 'Result Verification' %s" % self.driver.STDOUT_FILE, allow_fail=False) + + processor2.stop_nodes(not crash) + + with processor3.node.account.monitor_log(processor3.STDOUT_FILE) as monitor3: + processor3.start() + monitor3.wait_until('REBALANCING -> RUNNING', + timeout_sec=120, + err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor3.node.account) + ) + # there should still be some data left for this processor to work on. + monitor3.wait_until('processed', + timeout_sec=30, + err_msg="Didn't see any processing messages " + str(processor3.node.account) + ) self.driver.wait() self.driver.stop() - self.processor2.stop() - self.processor3.stop() - self.processor4.stop() + processor3.stop() - node = self.driver.node - node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False) + if crash and not eos: + self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False) + else: + self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)