Repository: kafka
Updated Branches:
  refs/heads/0.10.0 f2405a73e -> ce34614a4


HOTFIX: Start embedded kafka in KafkaStreamsTest to avoid hanging

The KafkaStreamsTest can occasionally hang if the test doesn't run fast enough. 
This is due to there being no brokers available on the broker.urls provided to 
the StreamsConfig. The KafkaConsumer does a poll and blocks causing the test to 
never complete.

Author: Damian Guy <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #1693 from dguy/kafka-streams-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce34614a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce34614a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce34614a

Branch: refs/heads/0.10.0
Commit: ce34614a43fb1f43ef6b5660fb37f7a0598d177a
Parents: f2405a7
Author: Damian Guy <[email protected]>
Authored: Tue Aug 2 12:41:20 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Tue Aug 2 12:41:20 2016 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ce34614a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index af7e681..f8293b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.streams;
 
+import 
org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.File;
@@ -31,11 +33,16 @@ import static org.junit.Assert.assertTrue;
 
 public class KafkaStreamsTest {
 
+    // We need this to avoid the KafkaConsumer hanging on poll (this may occur 
if the test doesn't complete
+    // quick enough
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new 
EmbeddedSingleNodeKafkaCluster();
+
     @Test
     public void testStartAndClose() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"testStartAndClose");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
@@ -58,7 +65,7 @@ public class KafkaStreamsTest {
     public void testCloseIsIdempotent() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"testCloseIsIdempotent");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
 
         final KStreamBuilder builder = new KStreamBuilder();
@@ -75,7 +82,7 @@ public class KafkaStreamsTest {
     public void testCannotStartOnceClosed() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"testCannotStartOnceClosed");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -95,7 +102,7 @@ public class KafkaStreamsTest {
     public void testCannotStartTwice() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"testCannotStartTwice");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -115,7 +122,7 @@ public class KafkaStreamsTest {
     public void testCleanup() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"testLocalCleanup");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -137,7 +144,7 @@ public class KafkaStreamsTest {
         final File stateDirApp2 = new File(stateDir + File.separator + appId2);
 
         final Properties props = new Properties();
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
 
         assertFalse(stateDirApp1.exists());
@@ -164,7 +171,7 @@ public class KafkaStreamsTest {
     public void testCannotCleanupWhileRunning() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"testCannotCleanupWhileRunning");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);

Reply via email to