Repository: kafka Updated Branches: refs/heads/0.10.0 ca45bd031 -> bff5349a4
KAFKA-4058: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset - use AdminTool to check for active consumer group Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Ismael Juma, Guozhang Wang Closes #1756 from mjsax/kafka-4058-reset-tool-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bff5349a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bff5349a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bff5349a Branch: refs/heads/0.10.0 Commit: bff5349a49c2bf7c3b30c8ddc126e53cb6e06dca Parents: ca45bd0 Author: Matthias J. Sax <matth...@confluent.io> Authored: Tue Aug 30 11:59:41 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Aug 30 11:59:41 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/test/TestCondition.java | 26 +++++++ .../java/org/apache/kafka/test/TestUtils.java | 76 ++++++++++++-------- .../main/scala/kafka/tools/StreamsResetter.java | 2 +- .../integration/ResetIntegrationTest.java | 40 ++++++++++- 4 files changed, 112 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/clients/src/test/java/org/apache/kafka/test/TestCondition.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestCondition.java b/clients/src/test/java/org/apache/kafka/test/TestCondition.java new file mode 100644 index 0000000..f78c91b --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/TestCondition.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + + +package org.apache.kafka.test; + +/** + * Interface to wrap actions that are required to wait until a condition is met + * for testing purposes. Note that this is not intended to do any assertions. + */ +public interface TestCondition { + + boolean conditionMet(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 1bfe578..ef3b6bc 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,10 @@ */ package org.apache.kafka.test; -import static java.util.Arrays.asList; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; import java.io.File; import java.io.IOException; @@ -28,10 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.utils.Utils; +import static java.util.Arrays.asList; /** @@ -49,51 +49,51 @@ public class TestUtils { public static final Random SEEDED_RANDOM = new Random(192348092834L); public static final Random RANDOM = new Random(); - public static Cluster singletonCluster(Map<String, Integer> topicPartitionCounts) { + public static Cluster singletonCluster(final Map<String, Integer> topicPartitionCounts) { return clusterWith(1, topicPartitionCounts); } - public static Cluster singletonCluster(String topic, int partitions) { + public static Cluster singletonCluster(final String topic, final int partitions) { return clusterWith(1, topic, partitions); } - public static Cluster clusterWith(int nodes, Map<String, Integer> topicPartitionCounts) { - Node[] ns = new Node[nodes]; + public static Cluster clusterWith(final int nodes, final Map<String, Integer> topicPartitionCounts) { + final Node[] ns = new Node[nodes]; for (int i = 0; i < nodes; i++) ns[i] = new Node(i, "localhost", 1969); - List<PartitionInfo> parts = new ArrayList<>(); - for (Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) { - String topic = topicPartition.getKey(); - int partitions = topicPartition.getValue(); + final List<PartitionInfo> parts = new ArrayList<>(); + for (final Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) { + final String topic = topicPartition.getKey(); + final int partitions = topicPartition.getValue(); for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); } return new Cluster(asList(ns), parts, Collections.<String>emptySet()); } - public static Cluster clusterWith(int nodes, String topic, int partitions) { + public static Cluster clusterWith(final int nodes, final String topic, final int partitions) { return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } /** * Generate an array of random bytes - * + * * @param size The size of the array */ - public static byte[] randomBytes(int size) { - byte[] bytes = new byte[size]; + public static byte[] randomBytes(final int size) { + final byte[] bytes = new byte[size]; SEEDED_RANDOM.nextBytes(bytes); return bytes; } /** * Generate a random string of letters and digits of the given length - * + * * @param len The length of the string * @return The random string */ - public static String randomString(int len) { - StringBuilder b = new StringBuilder(); + public static String randomString(final int len) { + final StringBuilder b = new StringBuilder(); for (int i = 0; i < len; i++) b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length()))); return b.toString(); @@ -104,7 +104,7 @@ public class TestUtils { * suffix to generate its name. */ public static File tempFile() throws IOException { - File file = File.createTempFile("kafka", ".tmp"); + final File file = File.createTempFile("kafka", ".tmp"); file.deleteOnExit(); return file; @@ -115,7 +115,7 @@ public class TestUtils { * * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix */ - public static File tempDirectory(String prefix) throws IOException { + public static File tempDirectory(final String prefix) throws IOException { return tempDirectory(null, prefix); } @@ -125,10 +125,10 @@ public class TestUtils { * @param parent The parent folder path name, if null using the default temporary-file directory * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix */ - public static File tempDirectory(Path parent, String prefix) throws IOException { + public static File tempDirectory(final Path parent, final String prefix) throws IOException { final File file = parent == null ? - Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() : - Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile(); + Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() : + Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile(); file.deleteOnExit(); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -141,4 +141,24 @@ public class TestUtils { return file; } + /** + * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise. + * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used + * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to + * avoid transient failures due to slow or overloaded machines. + */ + public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, String conditionDetails) throws InterruptedException { + final long startTime = System.currentTimeMillis(); + + + while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) { + Thread.sleep(Math.min(maxWaitMs, 100L)); + } + + if (!testCondition.conditionMet()) { + conditionDetails = conditionDetails != null ? conditionDetails : ""; + throw new AssertionError("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails); + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 8d9cd5e..7153790 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -90,7 +90,7 @@ public class StreamsResetter { adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption)); final String groupId = this.options.valueOf(applicationIdOption); - if (adminClient.describeConsumerGroup(groupId).size() != 0) { + if (!adminClient.describeGroup(groupId).members().isEmpty()) { throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + "Make sure to stop all running application instances before running the reset tool."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bff5349a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 8dd1f09..0e4129e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import kafka.admin.AdminClient; import kafka.tools.StreamsResetter; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -37,8 +38,11 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -70,6 +74,10 @@ public class ResetIntegrationTest { private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; + private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); + + private AdminClient adminClient = null; + @BeforeClass public static void startKafkaCluster() throws Exception { CLUSTER.createTopic(INPUT_TOPIC); @@ -79,6 +87,19 @@ public class ResetIntegrationTest { CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); } + @Before + public void prepare() { + this.adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); + } + + @After + public void cleanup() { + if (this.adminClient != null) { + this.adminClient.close(); + this.adminClient = null; + } + } + @Test public void testReprocessingFromScratchAfterReset() throws Exception { final Properties streamsConfiguration = prepareTest(); @@ -96,13 +117,16 @@ public class ResetIntegrationTest { final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0); streams.close(); + TestUtils.waitForCondition(this.consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET - Utils.sleep(STREAMS_CONSUMER_TIMEOUT); streams.cleanUp(); cleanGlobal(); + TestUtils.waitForCondition(this.consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + assertInternalTopicsGotDeleted(); - Utils.sleep(CLEANUP_CONSUMER_TIMEOUT); // RE-RUN streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); @@ -184,12 +208,15 @@ public class ResetIntegrationTest { final KStream<Long, Long> windowedCounts = input .through(INTERMEDIATE_USER_TOPIC) .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { + private long sleep = 1000; + @Override public KeyValue<Long, String> apply(final Long key, final String value) { // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped // => want to test "skip over" unprocessed records // increasing the sleep time only has disadvantage that test run time is increased - Utils.sleep(1000); + Utils.sleep(this.sleep); + this.sleep *= 2; return new KeyValue<>(key, value); } }) @@ -253,4 +280,11 @@ public class ResetIntegrationTest { assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); } + private class WaitUntilConsumerGroupGotClosed implements TestCondition { + @Override + public boolean conditionMet() { + return ResetIntegrationTest.this.adminClient.describeGroup(APP_ID).members().isEmpty(); + } + } + }