Repository: kafka
Updated Branches:
  refs/heads/0.10.0 ca45bd031 -> bff5349a4


KAFKA-4058: Failure in 
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset

 - use AdminTool to check for active consumer group

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1756 from mjsax/kafka-4058-reset-tool-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bff5349a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bff5349a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bff5349a

Branch: refs/heads/0.10.0
Commit: bff5349a49c2bf7c3b30c8ddc126e53cb6e06dca
Parents: ca45bd0
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Tue Aug 30 11:59:41 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Aug 30 11:59:41 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/test/TestCondition.java    | 26 +++++++
 .../java/org/apache/kafka/test/TestUtils.java   | 76 ++++++++++++--------
 .../main/scala/kafka/tools/StreamsResetter.java |  2 +-
 .../integration/ResetIntegrationTest.java       | 40 ++++++++++-
 4 files changed, 112 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/clients/src/test/java/org/apache/kafka/test/TestCondition.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestCondition.java 
b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
new file mode 100644
index 0000000..f78c91b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.test;
+
+/**
+ * Interface to wrap actions that are required to wait until a condition is met
+ * for testing purposes.  Note that this is not intended to do any assertions.
+ */
+public interface TestCondition {
+
+    boolean conditionMet();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 1bfe578..ef3b6bc 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.test;
 
-import static java.util.Arrays.asList;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,10 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.utils.Utils;
+import static java.util.Arrays.asList;
 
 
 /**
@@ -49,51 +49,51 @@ public class TestUtils {
     public static final Random SEEDED_RANDOM = new Random(192348092834L);
     public static final Random RANDOM = new Random();
 
-    public static Cluster singletonCluster(Map<String, Integer> 
topicPartitionCounts) {
+    public static Cluster singletonCluster(final Map<String, Integer> 
topicPartitionCounts) {
         return clusterWith(1, topicPartitionCounts);
     }
 
-    public static Cluster singletonCluster(String topic, int partitions) {
+    public static Cluster singletonCluster(final String topic, final int 
partitions) {
         return clusterWith(1, topic, partitions);
     }
 
-    public static Cluster clusterWith(int nodes, Map<String, Integer> 
topicPartitionCounts) {
-        Node[] ns = new Node[nodes];
+    public static Cluster clusterWith(final int nodes, final Map<String, 
Integer> topicPartitionCounts) {
+        final Node[] ns = new Node[nodes];
         for (int i = 0; i < nodes; i++)
             ns[i] = new Node(i, "localhost", 1969);
-        List<PartitionInfo> parts = new ArrayList<>();
-        for (Map.Entry<String, Integer> topicPartition : 
topicPartitionCounts.entrySet()) {
-            String topic = topicPartition.getKey();
-            int partitions = topicPartition.getValue();
+        final List<PartitionInfo> parts = new ArrayList<>();
+        for (final Map.Entry<String, Integer> topicPartition : 
topicPartitionCounts.entrySet()) {
+            final String topic = topicPartition.getKey();
+            final int partitions = topicPartition.getValue();
             for (int i = 0; i < partitions; i++)
                 parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, 
ns));
         }
         return new Cluster(asList(ns), parts, Collections.<String>emptySet());
     }
 
-    public static Cluster clusterWith(int nodes, String topic, int partitions) 
{
+    public static Cluster clusterWith(final int nodes, final String topic, 
final int partitions) {
         return clusterWith(nodes, Collections.singletonMap(topic, partitions));
     }
 
     /**
      * Generate an array of random bytes
-     * 
+     *
      * @param size The size of the array
      */
-    public static byte[] randomBytes(int size) {
-        byte[] bytes = new byte[size];
+    public static byte[] randomBytes(final int size) {
+        final byte[] bytes = new byte[size];
         SEEDED_RANDOM.nextBytes(bytes);
         return bytes;
     }
 
     /**
      * Generate a random string of letters and digits of the given length
-     * 
+     *
      * @param len The length of the string
      * @return The random string
      */
-    public static String randomString(int len) {
-        StringBuilder b = new StringBuilder();
+    public static String randomString(final int len) {
+        final StringBuilder b = new StringBuilder();
         for (int i = 0; i < len; i++)
             
b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length())));
         return b.toString();
@@ -104,7 +104,7 @@ public class TestUtils {
      * suffix to generate its name.
      */
     public static File tempFile() throws IOException {
-        File file = File.createTempFile("kafka", ".tmp");
+        final File file = File.createTempFile("kafka", ".tmp");
         file.deleteOnExit();
 
         return file;
@@ -115,7 +115,7 @@ public class TestUtils {
      *
      * @param prefix The prefix of the temporary directory, if null using 
"kafka-" as default prefix
      */
-    public static File tempDirectory(String prefix) throws IOException {
+    public static File tempDirectory(final String prefix) throws IOException {
         return tempDirectory(null, prefix);
     }
 
@@ -125,10 +125,10 @@ public class TestUtils {
      * @param parent The parent folder path name, if null using the default 
temporary-file directory
      * @param prefix The prefix of the temporary directory, if null using 
"kafka-" as default prefix
      */
-    public static File tempDirectory(Path parent, String prefix) throws 
IOException {
+    public static File tempDirectory(final Path parent, final String prefix) 
throws IOException {
         final File file = parent == null ?
-                Files.createTempDirectory(prefix == null ? "kafka-" : 
prefix).toFile() :
-                Files.createTempDirectory(parent, prefix == null ? "kafka-" : 
prefix).toFile();
+            Files.createTempDirectory(prefix == null ? "kafka-" : 
prefix).toFile() :
+            Files.createTempDirectory(parent, prefix == null ? "kafka-" : 
prefix).toFile();
         file.deleteOnExit();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -141,4 +141,24 @@ public class TestUtils {
         return file;
     }
 
+    /**
+     * Wait for condition to be met for at most {@code maxWaitMs} and throw 
assertion failure otherwise.
+     * This should be used instead of {@code Thread.sleep} whenever possible 
as it allows a longer timeout to be used
+     * without unnecessarily increasing test time (as the condition is checked 
frequently). The longer timeout is needed to
+     * avoid transient failures due to slow or overloaded machines.
+     */
+    public static void waitForCondition(final TestCondition testCondition, 
final long maxWaitMs, String conditionDetails) throws InterruptedException {
+        final long startTime = System.currentTimeMillis();
+
+
+        while (!testCondition.conditionMet() && ((System.currentTimeMillis() - 
startTime) < maxWaitMs)) {
+            Thread.sleep(Math.min(maxWaitMs, 100L));
+        }
+
+        if (!testCondition.conditionMet()) {
+            conditionDetails = conditionDetails != null ? conditionDetails : 
"";
+            throw new AssertionError("Condition not met within timeout " + 
maxWaitMs + ". " + conditionDetails);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 8d9cd5e..7153790 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -90,7 +90,7 @@ public class StreamsResetter {
 
             adminClient = 
AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
             final String groupId = this.options.valueOf(applicationIdOption);
-            if (adminClient.describeConsumerGroup(groupId).size() != 0) {
+            if (!adminClient.describeGroup(groupId).members().isEmpty()) {
                 throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. " +
                     "Make sure to stop all running application instances 
before running the reset tool.");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 8dd1f09..0e4129e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.admin.AdminClient;
 import kafka.tools.StreamsResetter;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -37,8 +38,11 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -70,6 +74,10 @@ public class ResetIntegrationTest {
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
 
+    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new 
WaitUntilConsumerGroupGotClosed();
+
+    private AdminClient adminClient = null;
+
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
         CLUSTER.createTopic(INPUT_TOPIC);
@@ -79,6 +87,19 @@ public class ResetIntegrationTest {
         CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
     }
 
+    @Before
+    public void prepare() {
+        this.adminClient = 
AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
+    }
+
+    @After
+    public void cleanup() {
+        if (this.adminClient != null) {
+            this.adminClient.close();
+            this.adminClient = null;
+        }
+    }
+
     @Test
     public void testReprocessingFromScratchAfterReset() throws Exception {
         final Properties streamsConfiguration = prepareTest();
@@ -96,13 +117,16 @@ public class ResetIntegrationTest {
         final KeyValue<Object, Object> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig,
 OUTPUT_TOPIC_2, 1).get(0);
 
         streams.close();
+        TestUtils.waitForCondition(this.consumerGroupInactive, 5 * 
STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (5 
* STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
-        Utils.sleep(STREAMS_CONSUMER_TIMEOUT);
         streams.cleanUp();
         cleanGlobal();
+        TestUtils.waitForCondition(this.consumerGroupInactive, 5 * 
CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (5 * 
CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
         assertInternalTopicsGotDeleted();
-        Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);
 
         // RE-RUN
         streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), 
streamsConfiguration);
@@ -184,12 +208,15 @@ public class ResetIntegrationTest {
         final KStream<Long, Long> windowedCounts = input
             .through(INTERMEDIATE_USER_TOPIC)
             .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+                private long sleep = 1000;
+
                 @Override
                 public KeyValue<Long, String> apply(final Long key, final 
String value) {
                     // must sleep long enough to avoid processing the whole 
intermediate topic before application gets stopped
                     // => want to test "skip over" unprocessed records
                     // increasing the sleep time only has disadvantage that 
test run time is increased
-                    Utils.sleep(1000);
+                    Utils.sleep(this.sleep);
+                    this.sleep *= 2;
                     return new KeyValue<>(key, value);
                 }
             })
@@ -253,4 +280,11 @@ public class ResetIntegrationTest {
         assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
     }
 
+    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
+        @Override
+        public boolean conditionMet() {
+            return 
ResetIntegrationTest.this.adminClient.describeGroup(APP_ID).members().isEmpty();
+        }
+    }
+
 }

Reply via email to