Repository: kafka
Updated Branches:
  refs/heads/trunk 3f6c4f63c -> 825f225bc


KAFKA-4588: Wait for topics to be created in 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race between 
when the topic is actually created/ready on the broker and when the assignment 
happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a 
`Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix 
this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait 
until the topics have been created.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/825f225b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/825f225b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/825f225b

Branch: refs/heads/trunk
Commit: 825f225bc5706b16af8ec44ca47ee1452c11e6f3
Parents: 3f6c4f6
Author: Damian Guy <damian....@gmail.com>
Authored: Tue Jan 17 12:33:11 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Jan 17 12:33:11 2017 -0800

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  1 +
 .../GlobalKTableIntegrationTest.java            |  4 +-
 .../KStreamAggregationDedupIntegrationTest.java |  4 +-
 .../KStreamAggregationIntegrationTest.java      |  4 +-
 .../KStreamKTableJoinIntegrationTest.java       |  2 +-
 .../integration/KStreamRepartitionJoinTest.java |  8 ++--
 .../QueryableStateIntegrationTest.java          | 14 +++----
 .../integration/utils/EmbeddedKafkaCluster.java | 24 +++++++++--
 .../integration/utils/IntegrationTestUtils.java | 43 ++++++++++++++++++++
 .../integration/utils/KafkaEmbedded.java        |  3 ++
 .../org/apache/kafka/test/StreamsTestUtils.java |  1 +
 11 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b68cf98..04f364c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -156,6 +156,7 @@
     
     <subpackage name="integration">
       <allow pkg="kafka.admin" />
+      <allow pkg="kafka.api" />
       <allow pkg="kafka.server" />
       <allow pkg="kafka.tools" />
       <allow pkg="kafka.utils" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 85b851d..6ac87ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest {
     private ForeachAction<String, String> foreachAction;
 
     @Before
-    public void before() {
+    public void before() throws InterruptedException {
         testNo++;
         builder = new KStreamBuilder();
         createTopics();
@@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest {
     }
 
 
-    private void createTopics() {
+    private void createTopics() throws InterruptedException {
         inputStream = "input-stream-" + testNo;
         inputTable = "input-table-" + testNo;
         globalOne = "globalOne-" + testNo;

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 9397e03..f2a767c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest {
 
 
     @Before
-    public void before() {
+    public void before() throws InterruptedException {
         testNo++;
         builder = new KStreamBuilder();
         createTopics();
@@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
 
-    private void createTopics() {
+    private void createTopics() throws InterruptedException {
         streamOneInput = "stream-one-" + testNo;
         outputTopic = "output-" + testNo;
         CLUSTER.createTopic(streamOneInput, 3, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 0833f3c..beb41ce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest {
     private KStream<Integer, String> stream;
 
     @Before
-    public void before() {
+    public void before() throws InterruptedException {
         testNo++;
         builder = new KStreamBuilder();
         createTopics();
@@ -637,7 +637,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
 
-    private void createTopics() {
+    private void createTopics() throws InterruptedException {
         streamOneInput = "stream-one-" + testNo;
         outputTopic = "output-" + testNo;
         userSessionsStream = userSessionsStream + "-" + testNo;

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 3618f15..0a16494 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest {
     private Properties streamsConfiguration;
 
     @Before
-    public void before() {
+    public void before() throws InterruptedException {
         testNo++;
         userClicksTopic = "user-clicks-" + testNo;
         userRegionsTopic = "user-regions-" + testNo;

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index e8a042a..6f3c95a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -93,7 +93,7 @@ public class KStreamRepartitionJoinTest {
     }
 
     @Before
-    public void before() {
+    public void before() throws InterruptedException {
         testNo++;
         String applicationId = "kstream-repartition-join-test-" + testNo;
         builder = new KStreamBuilder();
@@ -146,7 +146,7 @@ public class KStreamRepartitionJoinTest {
         verifyLeftJoin(leftJoin);
     }
 
-    private ExpectedOutputOnTopic mapStreamOneAndJoin() {
+    private ExpectedOutputOnTopic mapStreamOneAndJoin() throws 
InterruptedException {
         String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
         doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
mapOneStreamAndJoinOutput);
@@ -350,7 +350,7 @@ public class KStreamRepartitionJoinTest {
             mockTime);
     }
 
-    private void createTopics() {
+    private void createTopics() throws InterruptedException {
         streamOneInput = "stream-one-" + testNo;
         streamTwoInput = "stream-two-" + testNo;
         streamFourInput = "stream-four-" + testNo;
@@ -395,7 +395,7 @@ public class KStreamRepartitionJoinTest {
 
     private void doJoin(final KStream<Integer, Integer> lhs,
                         final KStream<Integer, String> rhs,
-                        final String outputTopic) {
+                        final String outputTopic) throws InterruptedException {
         CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
             TOSTRING_JOINER,

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index b8f91fa..911c6a8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -81,6 +81,7 @@ public class QueryableStateIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
         new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final int STREAM_THREE_PARTITIONS = 4;
     private final MockTime mockTime = CLUSTER.time;
     private String streamOne = "stream-one";
     private String streamTwo = "stream-two";
@@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest {
     private String outputTopicThree = "output-three";
     // sufficiently large window size such that everything falls into 1 window
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, 
TimeUnit.DAYS);
-    private static final int NUM_PARTITIONS = 2;
+    private static final int STREAM_TWO_PARTITIONS = 2;
     private static final int NUM_REPLICAS = NUM_BROKERS;
     private Properties streamsConfiguration;
     private List<String> inputValues;
@@ -101,7 +102,7 @@ public class QueryableStateIntegrationTest {
     private Comparator<KeyValue<String, Long>> stringLongComparator;
     private static int testNo = 0;
 
-    public void createTopics() {
+    public void createTopics() throws InterruptedException {
         streamOne = streamOne + "-" + testNo;
         streamConcurrent = streamConcurrent + "-" + testNo;
         streamThree = streamThree + "-" + testNo;
@@ -111,8 +112,8 @@ public class QueryableStateIntegrationTest {
         streamTwo = streamTwo + "-" + testNo;
         CLUSTER.createTopic(streamOne);
         CLUSTER.createTopic(streamConcurrent);
-        CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS);
-        CLUSTER.createTopic(streamThree, 4, 1);
+        CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
+        CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
         CLUSTER.createTopic(outputTopic);
         CLUSTER.createTopic(outputTopicConcurrent);
         CLUSTER.createTopic(outputTopicThree);
@@ -128,7 +129,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Before
-    public void before() throws IOException {
+    public void before() throws IOException, InterruptedException {
         testNo++;
         createTopics();
         streamsConfiguration = new Properties();
@@ -145,7 +146,6 @@ public class QueryableStateIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
 
-
         stringComparator = new Comparator<KeyValue<String, String>>() {
 
             @Override
@@ -328,7 +328,7 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void queryOnRebalance() throws Exception {
-        final int numThreads = NUM_PARTITIONS;
+        final int numThreads = STREAM_TWO_PARTITIONS;
         final StreamRunnable[] streamRunnables = new 
StreamRunnable[numThreads];
         final Thread[] streamThreads = new Thread[numThreads];
         final int numIterations = 500000;

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 519b1f5..619b6b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -18,13 +18,17 @@
 package org.apache.kafka.streams.integration.utils;
 
 import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.common.TopicPartition;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -34,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
     private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random 
port being selected
+    public static final int TOPIC_CREATION_TIMEOUT = 30000;
     private EmbeddedZookeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
     private final Properties brokerConfig;
@@ -122,7 +127,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      *
      * @param topic The name of the topic.
      */
-    public void createTopic(final String topic) {
+    public void createTopic(final String topic) throws InterruptedException {
         createTopic(topic, 1, 1, new Properties());
     }
 
@@ -133,7 +138,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param partitions  The number of partitions for this topic.
      * @param replication The replication factor for (the partitions of) this 
topic.
      */
-    public void createTopic(final String topic, final int partitions, final 
int replication) {
+    public void createTopic(final String topic, final int partitions, final 
int replication) throws InterruptedException {
         createTopic(topic, partitions, replication, new Properties());
     }
 
@@ -148,11 +153,24 @@ public class EmbeddedKafkaCluster extends 
ExternalResource {
     public void createTopic(final String topic,
                             final int partitions,
                             final int replication,
-                            final Properties topicConfig) {
+                            final Properties topicConfig) throws 
InterruptedException {
         brokers[0].createTopic(topic, partitions, replication, topicConfig);
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (int partition = 0; partition < partitions; partition++) {
+            topicPartitions.add(new TopicPartition(topic, partition));
+        }
+        IntegrationTestUtils.waitForTopicPartitions(brokers(), 
topicPartitions, TOPIC_CREATION_TIMEOUT);
     }
 
     public void deleteTopic(final String topic) {
         brokers[0].deleteTopic(topic);
     }
+
+    public List<KafkaServer> brokers() {
+        final List<KafkaServer> servers = new ArrayList<>();
+        for (final KafkaEmbedded broker : brokers) {
+            servers.add(broker.kafkaServer());
+        }
+        return servers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index aa358ab..875c359 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -17,6 +17,10 @@
 
 package org.apache.kafka.streams.integration.utils;
 
+import kafka.api.PartitionStateInfo;
+import kafka.api.Request;
+import kafka.server.KafkaServer;
+import kafka.server.MetadataCache;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -24,12 +28,14 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -255,4 +261,41 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    public static void waitForTopicPartitions(final List<KafkaServer> servers,
+                                              final List<TopicPartition> 
partitions,
+                                              final long timeout) throws 
InterruptedException {
+        final long end = System.currentTimeMillis() + timeout;
+        for (final TopicPartition partition : partitions) {
+            final long remaining = end - System.currentTimeMillis();
+            if (remaining <= 0) {
+                throw new AssertionError("timed out while waiting for 
partitions to become available. Timeout=" + timeout);
+            }
+            waitUntilMetadataIsPropagated(servers, partition.topic(), 
partition.partition(), remaining);
+        }
+    }
+
+    public static void waitUntilMetadataIsPropagated(final List<KafkaServer> 
servers,
+                                                     final String topic,
+                                                     final int partition,
+                                                     final long timeout) 
throws InterruptedException {
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                for (final KafkaServer server : servers) {
+                    final MetadataCache metadataCache = 
server.apis().metadataCache();
+                    final Option<PartitionStateInfo> partitionInfo =
+                            metadataCache.getPartitionInfo(topic, partition);
+                    if (partitionInfo.isEmpty()) {
+                        return false;
+                    }
+                    final PartitionStateInfo partitionStateInfo = 
partitionInfo.get();
+                    if 
(!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader()))
 {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }, timeout, "metatadata for topic=" + topic + " partition=" + 
partition + " not propogated to all brokers");
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 70c5063..9b48272 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -202,4 +202,7 @@ public class KafkaEmbedded {
         zkClient.close();
     }
 
+    public KafkaServer kafkaServer() {
+        return kafka;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java 
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 555e622..73c1b63 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -66,4 +66,5 @@ public class StreamsTestUtils {
         }
         return results;
     }
+
 }

Reply via email to