Repository: kafka
Updated Branches:
  refs/heads/trunk 3e85f131e -> da70316a5


KAFKA-4647: Improve test coverage of GlobalStreamThread

Add a test to ensure a `StreamsException` is thrown when an exception other 
than `StreamsException` is caught

Author: Damian Guy <damian....@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2450 from dguy/KAFKA-4647


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da70316a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da70316a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da70316a

Branch: refs/heads/trunk
Commit: da70316a588172585a3960f2b0edb0e2d4ba5461
Parents: 3e85f13
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Feb 1 20:20:31 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Feb 1 20:20:31 2017 -0800

----------------------------------------------------------------------
 .../internals/GlobalStreamThreadTest.java       | 36 ++++++++++++++++++--
 1 file changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da70316a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 67138f7..e0c4882 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -34,7 +34,11 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -43,6 +47,7 @@ public class GlobalStreamThreadTest {
     private final KStreamBuilder builder = new KStreamBuilder();
     private final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private GlobalStreamThread globalStreamThread;
+    private StreamsConfig config;
 
     @Before
     public void before() {
@@ -50,7 +55,7 @@ public class GlobalStreamThreadTest {
         final HashMap<String, Object> properties = new HashMap<>();
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
-        final StreamsConfig config = new StreamsConfig(properties);
+        config = new StreamsConfig(properties);
         globalStreamThread = new 
GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
@@ -61,7 +66,7 @@ public class GlobalStreamThreadTest {
     }
 
     @Test
-    public void shouldThrowStreamsExceptionOnStartupIfThereIsAnException() 
throws Exception {
+    public void 
shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() throws 
Exception {
         // should throw as the MockConsumer hasn't been configured and there 
are no
         // partitions available
         try {
@@ -73,6 +78,33 @@ public class GlobalStreamThreadTest {
         assertFalse(globalStreamThread.stillRunning());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() 
throws Exception {
+        final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        globalStreamThread = new 
GlobalStreamThread(builder.buildGlobalStateTopology(),
+                                                    config,
+                                                    mockConsumer,
+                                                    new 
StateDirectory("appId", TestUtils.tempDirectory().getPath()),
+                                                    new Metrics(),
+                                                    new MockTime(),
+                                                    "client");
+
+        try {
+            globalStreamThread.start();
+            fail("Should have thrown StreamsException if start up failed");
+        } catch (StreamsException e) {
+            assertThat(e.getCause(), instanceOf(RuntimeException.class));
+            assertThat(e.getCause().getMessage(), equalTo("KABOOM!"));
+        }
+        assertFalse(globalStreamThread.stillRunning());
+    }
+
 
     @Test
     public void shouldBeRunningAfterSuccesulStart() throws Exception {

Reply via email to