Repository: kafka Updated Branches: refs/heads/trunk 3e85f131e -> da70316a5
KAFKA-4647: Improve test coverage of GlobalStreamThread Add a test to ensure a `StreamsException` is thrown when an exception other than `StreamsException` is caught Author: Damian Guy <damian....@gmail.com> Reviewers: Matthias J. Sax, Guozhang Wang Closes #2450 from dguy/KAFKA-4647 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da70316a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da70316a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da70316a Branch: refs/heads/trunk Commit: da70316a588172585a3960f2b0edb0e2d4ba5461 Parents: 3e85f13 Author: Damian Guy <damian....@gmail.com> Authored: Wed Feb 1 20:20:31 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Feb 1 20:20:31 2017 -0800 ---------------------------------------------------------------------- .../internals/GlobalStreamThreadTest.java | 36 ++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/da70316a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 67138f7..e0c4882 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -34,7 +34,11 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.List; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -43,6 +47,7 @@ public class GlobalStreamThreadTest { private final KStreamBuilder builder = new KStreamBuilder(); private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private GlobalStreamThread globalStreamThread; + private StreamsConfig config; @Before public void before() { @@ -50,7 +55,7 @@ public class GlobalStreamThreadTest { final HashMap<String, Object> properties = new HashMap<>(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah"); - final StreamsConfig config = new StreamsConfig(properties); + config = new StreamsConfig(properties); globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), config, mockConsumer, @@ -61,7 +66,7 @@ public class GlobalStreamThreadTest { } @Test - public void shouldThrowStreamsExceptionOnStartupIfThereIsAnException() throws Exception { + public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() throws Exception { // should throw as the MockConsumer hasn't been configured and there are no // partitions available try { @@ -73,6 +78,33 @@ public class GlobalStreamThreadTest { assertFalse(globalStreamThread.stillRunning()); } + @SuppressWarnings("unchecked") + @Test + public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception { + final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + @Override + public List<PartitionInfo> partitionsFor(final String topic) { + throw new RuntimeException("KABOOM!"); + } + }; + globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), + config, + mockConsumer, + new StateDirectory("appId", TestUtils.tempDirectory().getPath()), + new Metrics(), + new MockTime(), + "client"); + + try { + globalStreamThread.start(); + fail("Should have thrown StreamsException if start up failed"); + } catch (StreamsException e) { + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertThat(e.getCause().getMessage(), equalTo("KABOOM!")); + } + assertFalse(globalStreamThread.stillRunning()); + } + @Test public void shouldBeRunningAfterSuccesulStart() throws Exception {