Repository: kafka
Updated Branches:
  refs/heads/trunk 16469d7f9 -> 2586226a9


KAFKA-4058: Failure in 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset

 - use AdminTool to check for active consumer group

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Guozhang Wang <wangg...@gmail.com>

Closes #1767 from mjsax/kafka-4058-trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2586226a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2586226a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2586226a

Branch: refs/heads/trunk
Commit: 2586226a9a5300fea427ca001608ad86d393df1b
Parents: 16469d7
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Tue Sep 6 23:02:41 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Sep 6 23:02:41 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   | 112 +++++++++--------
 .../main/scala/kafka/tools/StreamsResetter.java |   2 +-
 .../integration/ResetIntegrationTest.java       | 123 +++++++++++++------
 3 files changed, 147 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 44026be..265661a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,16 @@
  */
 package org.apache.kafka.test;
 
-import static java.util.Arrays.asList;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.File;
 import java.io.IOException;
@@ -32,16 +41,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.Utils;
+import static java.util.Arrays.asList;
 
 /**
  * Helper functions for writing unit tests
@@ -62,51 +62,51 @@ public class TestUtils {
     public static final Random RANDOM = new Random();
     public static final long DEFAULT_MAX_WAIT_MS = 15000;
 
-    public static Cluster singletonCluster(Map<String, Integer> 
topicPartitionCounts) {
+    public static Cluster singletonCluster(final Map<String, Integer> 
topicPartitionCounts) {
         return clusterWith(1, topicPartitionCounts);
     }
 
-    public static Cluster singletonCluster(String topic, int partitions) {
+    public static Cluster singletonCluster(final String topic, final int 
partitions) {
         return clusterWith(1, topic, partitions);
     }
 
-    public static Cluster clusterWith(int nodes, Map<String, Integer> 
topicPartitionCounts) {
-        Node[] ns = new Node[nodes];
+    public static Cluster clusterWith(final int nodes, final Map<String, 
Integer> topicPartitionCounts) {
+        final Node[] ns = new Node[nodes];
         for (int i = 0; i < nodes; i++)
             ns[i] = new Node(i, "localhost", 1969);
-        List<PartitionInfo> parts = new ArrayList<>();
-        for (Map.Entry<String, Integer> topicPartition : 
topicPartitionCounts.entrySet()) {
-            String topic = topicPartition.getKey();
-            int partitions = topicPartition.getValue();
+        final List<PartitionInfo> parts = new ArrayList<>();
+        for (final Map.Entry<String, Integer> topicPartition : 
topicPartitionCounts.entrySet()) {
+            final String topic = topicPartition.getKey();
+            final int partitions = topicPartition.getValue();
             for (int i = 0; i < partitions; i++)
                 parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, 
ns));
         }
         return new Cluster(asList(ns), parts, Collections.<String>emptySet(), 
INTERNAL_TOPICS);
     }
 
-    public static Cluster clusterWith(int nodes, String topic, int partitions) 
{
+    public static Cluster clusterWith(final int nodes, final String topic, 
final int partitions) {
         return clusterWith(nodes, Collections.singletonMap(topic, partitions));
     }
 
     /**
      * Generate an array of random bytes
-     * 
+     *
      * @param size The size of the array
      */
-    public static byte[] randomBytes(int size) {
-        byte[] bytes = new byte[size];
+    public static byte[] randomBytes(final int size) {
+        final byte[] bytes = new byte[size];
         SEEDED_RANDOM.nextBytes(bytes);
         return bytes;
     }
 
     /**
      * Generate a random string of letters and digits of the given length
-     * 
+     *
      * @param len The length of the string
      * @return The random string
      */
-    public static String randomString(int len) {
-        StringBuilder b = new StringBuilder();
+    public static String randomString(final int len) {
+        final StringBuilder b = new StringBuilder();
         for (int i = 0; i < len; i++)
             
b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length())));
         return b.toString();
@@ -117,7 +117,7 @@ public class TestUtils {
      * suffix to generate its name.
      */
     public static File tempFile() throws IOException {
-        File file = File.createTempFile("kafka", ".tmp");
+        final File file = File.createTempFile("kafka", ".tmp");
         file.deleteOnExit();
 
         return file;
@@ -128,14 +128,15 @@ public class TestUtils {
      *
      * @param prefix The prefix of the temporary directory, if null using 
"kafka-" as default prefix
      */
-    public static File tempDirectory(String prefix) {
+    public static File tempDirectory(final String prefix) {
         return tempDirectory(null, prefix);
     }
 
     /**
      * Create a temporary relative directory in the default temporary-file 
directory with a
      * prefix of "kafka-"
-     * @return  the temporary directory just created.
+     *
+     * @return the temporary directory just created.
      */
     public static File tempDirectory() {
         return tempDirectory(null);
@@ -147,13 +148,13 @@ public class TestUtils {
      * @param parent The parent folder path name, if null using the default 
temporary-file directory
      * @param prefix The prefix of the temporary directory, if null using 
"kafka-" as default prefix
      */
-    public static File tempDirectory(Path parent, String prefix) {
+    public static File tempDirectory(final Path parent, String prefix) {
         final File file;
         prefix = prefix == null ? "kafka-" : prefix;
         try {
             file = parent == null ?
-                    Files.createTempDirectory(prefix).toFile() : 
Files.createTempDirectory(parent, prefix).toFile();
-        } catch (IOException ex) {
+                Files.createTempDirectory(prefix).toFile() : 
Files.createTempDirectory(parent, prefix).toFile();
+        } catch (final IOException ex) {
             throw new RuntimeException("Failed to create a temp dir", ex);
         }
         file.deleteOnExit();
@@ -174,13 +175,13 @@ public class TestUtils {
      * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, 
int valueOffset, int valueSize)` as this
      * constructor does not include either of these fields.
      */
-    public static ByteBuffer partitionRecordsBuffer(long offset, 
CompressionType compressionType, Record... records) {
+    public static ByteBuffer partitionRecordsBuffer(final long offset, final 
CompressionType compressionType, final Record... records) {
         int bufferSize = 0;
-        for (Record record : records)
+        for (final Record record : records)
             bufferSize += Records.LOG_OVERHEAD + record.size();
-        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, 
compressionType);
-        for (Record record : records)
+        final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+        final MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, 
compressionType);
+        for (final Record record : records)
             memoryRecords.append(offset, record);
         memoryRecords.close();
         return memoryRecords.buffer();
@@ -200,7 +201,7 @@ public class TestUtils {
         return properties;
     }
 
-    public static Properties producerConfig(final String bootstrapServers, 
Class keySerializer, Class valueSerializer) {
+    public static Properties producerConfig(final String bootstrapServers, 
final Class keySerializer, final Class valueSerializer) {
         return producerConfig(bootstrapServers, keySerializer, 
valueSerializer, new Properties());
     }
 
@@ -220,21 +221,32 @@ public class TestUtils {
         return consumerConfig;
     }
 
+    public static Properties consumerConfig(final String bootstrapServers,
+                                            final String groupId,
+                                            final Class keyDeserializer,
+                                            final Class valueDeserializer) {
+        return consumerConfig(bootstrapServers,
+            groupId,
+            keyDeserializer,
+            valueDeserializer,
+            new Properties());
+    }
+
     /**
      * returns consumer config with random UUID for the Group ID
      */
-    public static Properties consumerConfig(final String bootstrapServers, 
Class keyDeserializer, Class valueDeserializer) {
+    public static Properties consumerConfig(final String bootstrapServers, 
final Class keyDeserializer, final Class valueDeserializer) {
         return consumerConfig(bootstrapServers,
-                              UUID.randomUUID().toString(),
-                              keyDeserializer,
-                              valueDeserializer,
-                              new Properties());
+            UUID.randomUUID().toString(),
+            keyDeserializer,
+            valueDeserializer,
+            new Properties());
     }
 
     /**
-     *  uses default value of 15 seconds for timeout
+     * uses default value of 15 seconds for timeout
      */
-    public static void waitForCondition(TestCondition testCondition, String 
conditionDetails) throws InterruptedException {
+    public static void waitForCondition(final TestCondition testCondition, 
final String conditionDetails) throws InterruptedException {
         waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, conditionDetails);
     }
 
@@ -244,8 +256,8 @@ public class TestUtils {
      * without unnecessarily increasing test time (as the condition is checked 
frequently). The longer timeout is needed to
      * avoid transient failures due to slow or overloaded machines.
      */
-    public static void waitForCondition(TestCondition testCondition, long 
maxWaitMs, String conditionDetails) throws InterruptedException {
-        long startTime = System.currentTimeMillis();
+    public static void waitForCondition(final TestCondition testCondition, 
final long maxWaitMs, String conditionDetails) throws InterruptedException {
+        final long startTime = System.currentTimeMillis();
 
         while (!testCondition.conditionMet() && ((System.currentTimeMillis() - 
startTime) < maxWaitMs)) {
             Thread.sleep(Math.min(maxWaitMs, 100L));

http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 22b8bd6..7153790 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -90,7 +90,7 @@ public class StreamsResetter {
 
             adminClient = 
AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
             final String groupId = this.options.valueOf(applicationIdOption);
-            if (adminClient.describeConsumerGroup(groupId).get().size() != 0) {
+            if (!adminClient.describeGroup(groupId).members().isEmpty()) {
                 throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. " +
                     "Make sure to stop all running application instances 
before running the reset tool.");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2586226a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 79ec117..0f1717c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,10 +16,11 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.admin.AdminClient;
 import kafka.tools.StreamsResetter;
+import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -37,11 +38,13 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -61,6 +64,7 @@ public class ResetIntegrationTest {
     private static final int NUM_BROKERS = 1;
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
 
     private static final String APP_ID = "cleanup-integration-test";
     private static final String INPUT_TOPIC = "inputTopic";
@@ -72,6 +76,10 @@ public class ResetIntegrationTest {
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
 
+    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new 
WaitUntilConsumerGroupGotClosed();
+
+    private AdminClient adminClient = null;
+
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
         CLUSTER.createTopic(INPUT_TOPIC);
@@ -81,11 +89,27 @@ public class ResetIntegrationTest {
         CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
     }
 
-    @Ignore
+    @Before
+    public void prepare() {
+        adminClient = 
AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
+    }
+
+    @After
+    public void cleanup() {
+        if (adminClient != null) {
+            adminClient.close();
+            adminClient = null;
+        }
+    }
+
     @Test
     public void testReprocessingFromScratchAfterReset() throws Exception {
         final Properties streamsConfiguration = prepareTest();
-        final Properties resultTopicConsumerConfig = prepareResultConsumer();
+        final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
+            CLUSTER.bootstrapServers(),
+            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+            LongDeserializer.class,
+            LongDeserializer.class);
 
         prepareInputData();
         final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);
@@ -93,25 +117,42 @@ public class ResetIntegrationTest {
         // RUN
         KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
         // receive only first values to make sure intermediate user topic is 
not consumed completely
         // => required to test "seekToEnd" for intermediate topics
-        final KeyValue<Object, Object> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
 OUTPUT_TOPIC_2, 1).get(0);
+        final KeyValue<Object, Object> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC_2,
+            1
+        ).get(0);
 
         streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, 5 * 
STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (5 
* STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
-        Utils.sleep(STREAMS_CONSUMER_TIMEOUT);
         streams.cleanUp();
         cleanGlobal();
+        TestUtils.waitForCondition(consumerGroupInactive, 5 * 
CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (5 * 
CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
         assertInternalTopicsGotDeleted();
-        Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);
 
         // RE-RUN
         streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), 
streamsConfiguration);
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
 OUTPUT_TOPIC, 10);
-        final KeyValue<Object, Object> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
 OUTPUT_TOPIC_2_RERUN, 1).get(0);
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+        final KeyValue<Object, Object> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC_2_RERUN,
+            1
+        ).get(0);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
@@ -137,35 +178,29 @@ public class ResetIntegrationTest {
         return streamsConfiguration;
     }
 
-    private Properties prepareResultConsumer() {
-        final Properties resultTopicConsumerConfig = new Properties();
-        resultTopicConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        resultTopicConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + 
"-standard-consumer-" + OUTPUT_TOPIC);
-        resultTopicConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
resultTopicConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-        
resultTopicConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-
-        return resultTopicConsumerConfig;
-    }
-
     private void prepareInputData() throws Exception {
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 10L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 20L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 30L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 40L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 50L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 60L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 61L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 62L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 63L);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L);
+        final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, 
StringSerializer.class);
+
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 
mockTime.milliseconds());
     }
 
     private KStreamBuilder setupTopology(final String outputTopic2) {
@@ -188,12 +223,15 @@ public class ResetIntegrationTest {
         final KStream<Long, Long> windowedCounts = input
             .through(INTERMEDIATE_USER_TOPIC)
             .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+                private long sleep = 1000;
+
                 @Override
                 public KeyValue<Long, String> apply(final Long key, final 
String value) {
                     // must sleep long enough to avoid processing the whole 
intermediate topic before application gets stopped
                     // => want to test "skip over" unprocessed records
                     // increasing the sleep time only has disadvantage that 
test run time is increased
-                    Utils.sleep(1000);
+                    mockTime.sleep(sleep);
+                    sleep *= 2;
                     return new KeyValue<>(key, value);
                 }
             })
@@ -258,4 +296,11 @@ public class ResetIntegrationTest {
         assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
     }
 
+    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
+        @Override
+        public boolean conditionMet() {
+            return adminClient.describeGroup(APP_ID).members().isEmpty();
+        }
+    }
+
 }

Reply via email to