Repository: kafka Updated Branches: refs/heads/trunk 16469d7f9 -> 2586226a9
KAFKA-4058: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset - use AdminTool to check for active consumer group Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #1767 from mjsax/kafka-4058-trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2586226a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2586226a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2586226a Branch: refs/heads/trunk Commit: 2586226a9a5300fea427ca001608ad86d393df1b Parents: 16469d7 Author: Matthias J. Sax <matth...@confluent.io> Authored: Tue Sep 6 23:02:41 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Sep 6 23:02:41 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/test/TestUtils.java | 112 +++++++++-------- .../main/scala/kafka/tools/StreamsResetter.java | 2 +- .../integration/ResetIntegrationTest.java | 123 +++++++++++++------ 3 files changed, 147 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 44026be..265661a 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,16 @@ */ package org.apache.kafka.test; -import static java.util.Arrays.asList; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.Utils; import java.io.File; import java.io.IOException; @@ -32,16 +41,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.utils.Utils; +import static java.util.Arrays.asList; /** * Helper functions for writing unit tests @@ -62,51 +62,51 @@ public class TestUtils { public static final Random RANDOM = new Random(); public static final long DEFAULT_MAX_WAIT_MS = 15000; - public static Cluster singletonCluster(Map<String, Integer> topicPartitionCounts) { + public static Cluster singletonCluster(final Map<String, Integer> topicPartitionCounts) { return clusterWith(1, topicPartitionCounts); } - public static Cluster singletonCluster(String topic, int partitions) { + public static Cluster singletonCluster(final String topic, final int partitions) { return clusterWith(1, topic, partitions); } - public static Cluster clusterWith(int nodes, Map<String, Integer> topicPartitionCounts) { - Node[] ns = new Node[nodes]; + public static Cluster clusterWith(final int nodes, final Map<String, Integer> topicPartitionCounts) { + final Node[] ns = new Node[nodes]; for (int i = 0; i < nodes; i++) ns[i] = new Node(i, "localhost", 1969); - List<PartitionInfo> parts = new ArrayList<>(); - for (Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) { - String topic = topicPartition.getKey(); - int partitions = topicPartition.getValue(); + final List<PartitionInfo> parts = new ArrayList<>(); + for (final Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) { + final String topic = topicPartition.getKey(); + final int partitions = topicPartition.getValue(); for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); } return new Cluster(asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS); } - public static Cluster clusterWith(int nodes, String topic, int partitions) { + public static Cluster clusterWith(final int nodes, final String topic, final int partitions) { return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } /** * Generate an array of random bytes - * + * * @param size The size of the array */ - public static byte[] randomBytes(int size) { - byte[] bytes = new byte[size]; + public static byte[] randomBytes(final int size) { + final byte[] bytes = new byte[size]; SEEDED_RANDOM.nextBytes(bytes); return bytes; } /** * Generate a random string of letters and digits of the given length - * + * * @param len The length of the string * @return The random string */ - public static String randomString(int len) { - StringBuilder b = new StringBuilder(); + public static String randomString(final int len) { + final StringBuilder b = new StringBuilder(); for (int i = 0; i < len; i++) b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length()))); return b.toString(); @@ -117,7 +117,7 @@ public class TestUtils { * suffix to generate its name. */ public static File tempFile() throws IOException { - File file = File.createTempFile("kafka", ".tmp"); + final File file = File.createTempFile("kafka", ".tmp"); file.deleteOnExit(); return file; @@ -128,14 +128,15 @@ public class TestUtils { * * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix */ - public static File tempDirectory(String prefix) { + public static File tempDirectory(final String prefix) { return tempDirectory(null, prefix); } /** * Create a temporary relative directory in the default temporary-file directory with a * prefix of "kafka-" - * @return the temporary directory just created. + * + * @return the temporary directory just created. */ public static File tempDirectory() { return tempDirectory(null); @@ -147,13 +148,13 @@ public class TestUtils { * @param parent The parent folder path name, if null using the default temporary-file directory * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix */ - public static File tempDirectory(Path parent, String prefix) { + public static File tempDirectory(final Path parent, String prefix) { final File file; prefix = prefix == null ? "kafka-" : prefix; try { file = parent == null ? - Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent, prefix).toFile(); - } catch (IOException ex) { + Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent, prefix).toFile(); + } catch (final IOException ex) { throw new RuntimeException("Failed to create a temp dir", ex); } file.deleteOnExit(); @@ -174,13 +175,13 @@ public class TestUtils { * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this * constructor does not include either of these fields. */ - public static ByteBuffer partitionRecordsBuffer(long offset, CompressionType compressionType, Record... records) { + public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) { int bufferSize = 0; - for (Record record : records) + for (final Record record : records) bufferSize += Records.LOG_OVERHEAD + record.size(); - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType); - for (Record record : records) + final ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + final MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType); + for (final Record record : records) memoryRecords.append(offset, record); memoryRecords.close(); return memoryRecords.buffer(); @@ -200,7 +201,7 @@ public class TestUtils { return properties; } - public static Properties producerConfig(final String bootstrapServers, Class keySerializer, Class valueSerializer) { + public static Properties producerConfig(final String bootstrapServers, final Class keySerializer, final Class valueSerializer) { return producerConfig(bootstrapServers, keySerializer, valueSerializer, new Properties()); } @@ -220,21 +221,32 @@ public class TestUtils { return consumerConfig; } + public static Properties consumerConfig(final String bootstrapServers, + final String groupId, + final Class keyDeserializer, + final Class valueDeserializer) { + return consumerConfig(bootstrapServers, + groupId, + keyDeserializer, + valueDeserializer, + new Properties()); + } + /** * returns consumer config with random UUID for the Group ID */ - public static Properties consumerConfig(final String bootstrapServers, Class keyDeserializer, Class valueDeserializer) { + public static Properties consumerConfig(final String bootstrapServers, final Class keyDeserializer, final Class valueDeserializer) { return consumerConfig(bootstrapServers, - UUID.randomUUID().toString(), - keyDeserializer, - valueDeserializer, - new Properties()); + UUID.randomUUID().toString(), + keyDeserializer, + valueDeserializer, + new Properties()); } /** - * uses default value of 15 seconds for timeout + * uses default value of 15 seconds for timeout */ - public static void waitForCondition(TestCondition testCondition, String conditionDetails) throws InterruptedException { + public static void waitForCondition(final TestCondition testCondition, final String conditionDetails) throws InterruptedException { waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, conditionDetails); } @@ -244,8 +256,8 @@ public class TestUtils { * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to * avoid transient failures due to slow or overloaded machines. */ - public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException { - long startTime = System.currentTimeMillis(); + public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, String conditionDetails) throws InterruptedException { + final long startTime = System.currentTimeMillis(); while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) { Thread.sleep(Math.min(maxWaitMs, 100L)); http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 22b8bd6..7153790 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -90,7 +90,7 @@ public class StreamsResetter { adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption)); final String groupId = this.options.valueOf(applicationIdOption); - if (adminClient.describeConsumerGroup(groupId).get().size() != 0) { + if (!adminClient.describeGroup(groupId).members().isEmpty()) { throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + "Make sure to stop all running application instances before running the reset tool."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 79ec117..0f1717c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -16,10 +16,11 @@ */ package org.apache.kafka.streams.integration; +import kafka.admin.AdminClient; import kafka.tools.StreamsResetter; +import kafka.utils.MockTime; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -37,11 +38,13 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import java.util.Collections; @@ -61,6 +64,7 @@ public class ResetIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final String APP_ID = "cleanup-integration-test"; private static final String INPUT_TOPIC = "inputTopic"; @@ -72,6 +76,10 @@ public class ResetIntegrationTest { private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; + private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); + + private AdminClient adminClient = null; + @BeforeClass public static void startKafkaCluster() throws Exception { CLUSTER.createTopic(INPUT_TOPIC); @@ -81,11 +89,27 @@ public class ResetIntegrationTest { CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); } - @Ignore + @Before + public void prepare() { + adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); + } + + @After + public void cleanup() { + if (adminClient != null) { + adminClient.close(); + adminClient = null; + } + } + @Test public void testReprocessingFromScratchAfterReset() throws Exception { final Properties streamsConfiguration = prepareTest(); - final Properties resultTopicConsumerConfig = prepareResultConsumer(); + final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class); prepareInputData(); final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2); @@ -93,25 +117,42 @@ public class ResetIntegrationTest { // RUN KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); // receive only first values to make sure intermediate user topic is not consumed completely // => required to test "seekToEnd" for intermediate topics - final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0); + final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC_2, + 1 + ).get(0); streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET - Utils.sleep(STREAMS_CONSUMER_TIMEOUT); streams.cleanUp(); cleanGlobal(); + TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + assertInternalTopicsGotDeleted(); - Utils.sleep(CLEANUP_CONSUMER_TIMEOUT); // RE-RUN streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10); - final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2_RERUN, 1).get(0); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC_2_RERUN, + 1 + ).get(0); streams.close(); assertThat(resultRerun, equalTo(result)); @@ -137,35 +178,29 @@ public class ResetIntegrationTest { return streamsConfiguration; } - private Properties prepareResultConsumer() { - final Properties resultTopicConsumerConfig = new Properties(); - resultTopicConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - resultTopicConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-standard-consumer-" + OUTPUT_TOPIC); - resultTopicConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - resultTopicConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - resultTopicConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - - return resultTopicConsumerConfig; - } - private void prepareInputData() throws Exception { - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 10L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 20L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 30L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 40L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 50L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 60L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 61L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 62L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 63L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L); + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class); + + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); } private KStreamBuilder setupTopology(final String outputTopic2) { @@ -188,12 +223,15 @@ public class ResetIntegrationTest { final KStream<Long, Long> windowedCounts = input .through(INTERMEDIATE_USER_TOPIC) .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { + private long sleep = 1000; + @Override public KeyValue<Long, String> apply(final Long key, final String value) { // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped // => want to test "skip over" unprocessed records // increasing the sleep time only has disadvantage that test run time is increased - Utils.sleep(1000); + mockTime.sleep(sleep); + sleep *= 2; return new KeyValue<>(key, value); } }) @@ -258,4 +296,11 @@ public class ResetIntegrationTest { assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); } + private class WaitUntilConsumerGroupGotClosed implements TestCondition { + @Override + public boolean conditionMet() { + return adminClient.describeGroup(APP_ID).members().isEmpty(); + } + } + }