Repository: kafka Updated Branches: refs/heads/trunk 3f6c4f63c -> 825f225bc
KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a `Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait until the topics have been created. Author: Damian Guy <damian....@gmail.com> Reviewers: Matthias J. Sax, Guozhang Wang Closes #2371 from dguy/integration-test-fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/825f225b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/825f225b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/825f225b Branch: refs/heads/trunk Commit: 825f225bc5706b16af8ec44ca47ee1452c11e6f3 Parents: 3f6c4f6 Author: Damian Guy <damian....@gmail.com> Authored: Tue Jan 17 12:33:11 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Jan 17 12:33:11 2017 -0800 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../GlobalKTableIntegrationTest.java | 4 +- .../KStreamAggregationDedupIntegrationTest.java | 4 +- .../KStreamAggregationIntegrationTest.java | 4 +- .../KStreamKTableJoinIntegrationTest.java | 2 +- .../integration/KStreamRepartitionJoinTest.java | 8 ++-- .../QueryableStateIntegrationTest.java | 14 +++---- .../integration/utils/EmbeddedKafkaCluster.java | 24 +++++++++-- .../integration/utils/IntegrationTestUtils.java | 43 ++++++++++++++++++++ .../integration/utils/KafkaEmbedded.java | 3 ++ .../org/apache/kafka/test/StreamsTestUtils.java | 1 + 11 files changed, 87 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b68cf98..04f364c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -156,6 +156,7 @@ <subpackage name="integration"> <allow pkg="kafka.admin" /> + <allow pkg="kafka.api" /> <allow pkg="kafka.server" /> <allow pkg="kafka.tools" /> <allow pkg="kafka.utils" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 85b851d..6ac87ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest { private ForeachAction<String, String> foreachAction; @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest { } - private void createTopics() { + private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; globalOne = "globalOne-" + testNo; http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 9397e03..f2a767c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest { @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest { } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; outputTopic = "output-" + testNo; CLUSTER.createTopic(streamOneInput, 3, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 0833f3c..beb41ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest { private KStream<Integer, String> stream; @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -637,7 +637,7 @@ public class KStreamAggregationIntegrationTest { } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; outputTopic = "output-" + testNo; userSessionsStream = userSessionsStream + "-" + testNo; http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 3618f15..0a16494 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest { private Properties streamsConfiguration; @Before - public void before() { + public void before() throws InterruptedException { testNo++; userClicksTopic = "user-clicks-" + testNo; userRegionsTopic = "user-regions-" + testNo; http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index e8a042a..6f3c95a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -93,7 +93,7 @@ public class KStreamRepartitionJoinTest { } @Before - public void before() { + public void before() throws InterruptedException { testNo++; String applicationId = "kstream-repartition-join-test-" + testNo; builder = new KStreamBuilder(); @@ -146,7 +146,7 @@ public class KStreamRepartitionJoinTest { verifyLeftJoin(leftJoin); } - private ExpectedOutputOnTopic mapStreamOneAndJoin() { + private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException { String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo; doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput); @@ -350,7 +350,7 @@ public class KStreamRepartitionJoinTest { mockTime); } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; streamTwoInput = "stream-two-" + testNo; streamFourInput = "stream-four-" + testNo; @@ -395,7 +395,7 @@ public class KStreamRepartitionJoinTest { private void doJoin(final KStream<Integer, Integer> lhs, final KStream<Integer, String> rhs, - final String outputTopic) { + final String outputTopic) throws InterruptedException { CLUSTER.createTopic(outputTopic); lhs.join(rhs, TOSTRING_JOINER, http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index b8f91fa..911c6a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -81,6 +81,7 @@ public class QueryableStateIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + public static final int STREAM_THREE_PARTITIONS = 4; private final MockTime mockTime = CLUSTER.time; private String streamOne = "stream-one"; private String streamTwo = "stream-two"; @@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest { private String outputTopicThree = "output-three"; // sufficiently large window size such that everything falls into 1 window private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS); - private static final int NUM_PARTITIONS = 2; + private static final int STREAM_TWO_PARTITIONS = 2; private static final int NUM_REPLICAS = NUM_BROKERS; private Properties streamsConfiguration; private List<String> inputValues; @@ -101,7 +102,7 @@ public class QueryableStateIntegrationTest { private Comparator<KeyValue<String, Long>> stringLongComparator; private static int testNo = 0; - public void createTopics() { + public void createTopics() throws InterruptedException { streamOne = streamOne + "-" + testNo; streamConcurrent = streamConcurrent + "-" + testNo; streamThree = streamThree + "-" + testNo; @@ -111,8 +112,8 @@ public class QueryableStateIntegrationTest { streamTwo = streamTwo + "-" + testNo; CLUSTER.createTopic(streamOne); CLUSTER.createTopic(streamConcurrent); - CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS); - CLUSTER.createTopic(streamThree, 4, 1); + CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS); + CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1); CLUSTER.createTopic(outputTopic); CLUSTER.createTopic(outputTopicConcurrent); CLUSTER.createTopic(outputTopicThree); @@ -128,7 +129,7 @@ public class QueryableStateIntegrationTest { } @Before - public void before() throws IOException { + public void before() throws IOException, InterruptedException { testNo++; createTopics(); streamsConfiguration = new Properties(); @@ -145,7 +146,6 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - stringComparator = new Comparator<KeyValue<String, String>>() { @Override @@ -328,7 +328,7 @@ public class QueryableStateIntegrationTest { @Test public void queryOnRebalance() throws Exception { - final int numThreads = NUM_PARTITIONS; + final int numThreads = STREAM_TWO_PARTITIONS; final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; final Thread[] streamThreads = new Thread[numThreads]; final int numIterations = 500000; http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 519b1f5..619b6b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -18,13 +18,17 @@ package org.apache.kafka.streams.integration.utils; import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; import kafka.utils.MockTime; import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.common.TopicPartition; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; /** @@ -34,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected + public static final int TOPIC_CREATION_TIMEOUT = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private final Properties brokerConfig; @@ -122,7 +127,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { * * @param topic The name of the topic. */ - public void createTopic(final String topic) { + public void createTopic(final String topic) throws InterruptedException { createTopic(topic, 1, 1, new Properties()); } @@ -133,7 +138,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(final String topic, final int partitions, final int replication) { + public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException { createTopic(topic, partitions, replication, new Properties()); } @@ -148,11 +153,24 @@ public class EmbeddedKafkaCluster extends ExternalResource { public void createTopic(final String topic, final int partitions, final int replication, - final Properties topicConfig) { + final Properties topicConfig) throws InterruptedException { brokers[0].createTopic(topic, partitions, replication, topicConfig); + final List<TopicPartition> topicPartitions = new ArrayList<>(); + for (int partition = 0; partition < partitions; partition++) { + topicPartitions.add(new TopicPartition(topic, partition)); + } + IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT); } public void deleteTopic(final String topic) { brokers[0].deleteTopic(topic); } + + public List<KafkaServer> brokers() { + final List<KafkaServer> servers = new ArrayList<>(); + for (final KafkaEmbedded broker : brokers) { + servers.add(broker.kafkaServer()); + } + return servers; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index aa358ab..875c359 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -17,6 +17,10 @@ package org.apache.kafka.streams.integration.utils; +import kafka.api.PartitionStateInfo; +import kafka.api.Request; +import kafka.server.KafkaServer; +import kafka.server.MetadataCache; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -24,12 +28,14 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import scala.Option; import java.io.File; import java.io.IOException; @@ -255,4 +261,41 @@ public class IntegrationTestUtils { return accumData; } + public static void waitForTopicPartitions(final List<KafkaServer> servers, + final List<TopicPartition> partitions, + final long timeout) throws InterruptedException { + final long end = System.currentTimeMillis() + timeout; + for (final TopicPartition partition : partitions) { + final long remaining = end - System.currentTimeMillis(); + if (remaining <= 0) { + throw new AssertionError("timed out while waiting for partitions to become available. Timeout=" + timeout); + } + waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining); + } + } + + public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers, + final String topic, + final int partition, + final long timeout) throws InterruptedException { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + for (final KafkaServer server : servers) { + final MetadataCache metadataCache = server.apis().metadataCache(); + final Option<PartitionStateInfo> partitionInfo = + metadataCache.getPartitionInfo(topic, partition); + if (partitionInfo.isEmpty()) { + return false; + } + final PartitionStateInfo partitionStateInfo = partitionInfo.get(); + if (!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) { + return false; + } + } + return true; + } + }, timeout, "metatadata for topic=" + topic + " partition=" + partition + " not propogated to all brokers"); + + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 70c5063..9b48272 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -202,4 +202,7 @@ public class KafkaEmbedded { zkClient.close(); } + public KafkaServer kafkaServer() { + return kafka; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 555e622..73c1b63 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -66,4 +66,5 @@ public class StreamsTestUtils { } return results; } + }