Repository: kafka Updated Branches: refs/heads/trunk 228a4fdb6 -> edcefccfd
http://git-wip-us.apache.org/repos/asf/kafka/blob/edcefccf/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 554310c..09200f2 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.state.internals.ThreadCache; +import org.junit.rules.ExternalResource; import java.io.File; import java.lang.reflect.Field; @@ -41,38 +42,38 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class KStreamTestDriver { +public class KStreamTestDriver extends ExternalResource { private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; - private final ProcessorTopology topology; - private final MockProcessorContext context; - private final ProcessorTopology globalTopology; + private ProcessorTopology topology; + private MockProcessorContext context; + private ProcessorTopology globalTopology; - public KStreamTestDriver(final KStreamBuilder builder) { - this(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); + public void setUp(final KStreamBuilder builder) { + setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); } - public KStreamTestDriver(final KStreamBuilder builder, final File stateDir) { - this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray()); + public void setUp(final KStreamBuilder builder, final File stateDir) { + setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray()); } - public KStreamTestDriver(final KStreamBuilder builder, final File stateDir, final long cacheSize) { - this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize); + public void setUp(final KStreamBuilder builder, final File stateDir, final long cacheSize) { + setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize); } - public KStreamTestDriver(final KStreamBuilder builder, - final File stateDir, - final Serde<?> keySerde, - final Serde<?> valSerde) { - this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES); + public void setUp(final KStreamBuilder builder, + final File stateDir, + final Serde<?> keySerde, + final Serde<?> valSerde) { + setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES); } - public KStreamTestDriver(final KStreamBuilder builder, - final File stateDir, - final Serde<?> keySerde, - final Serde<?> valSerde, - final long cacheSize) { + public void setUp(final KStreamBuilder builder, + final File stateDir, + final Serde<?> keySerde, + final Serde<?> valSerde, + final long cacheSize) { builder.setApplicationId("TestDriver"); topology = builder.build(null); globalTopology = builder.buildGlobalStateTopology(); @@ -87,30 +88,30 @@ public class KStreamTestDriver { initTopology(topology, topology.stateStores()); } - public KStreamTestDriver(final StreamsBuilder builder) { - this(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); + public void setUp(final StreamsBuilder builder) { + setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); } - public KStreamTestDriver(final StreamsBuilder builder, final File stateDir) { - this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray()); + public void setUp(final StreamsBuilder builder, final File stateDir) { + setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray()); } - public KStreamTestDriver(final StreamsBuilder builder, final File stateDir, final long cacheSize) { - this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize); + public void setUp(final StreamsBuilder builder, final File stateDir, final long cacheSize) { + setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize); } - public KStreamTestDriver(final StreamsBuilder builder, - final File stateDir, - final Serde<?> keySerde, - final Serde<?> valSerde) { - this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES); + public void setUp(final StreamsBuilder builder, + final File stateDir, + final Serde<?> keySerde, + final Serde<?> valSerde) { + setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES); } - public KStreamTestDriver(final StreamsBuilder builder, - final File stateDir, - final Serde<?> keySerde, - final Serde<?> valSerde, - final long cacheSize) { + public void setUp(final StreamsBuilder builder, + final File stateDir, + final Serde<?> keySerde, + final Serde<?> valSerde, + final long cacheSize) { // TODO: we should refactor this to avoid usage of reflection final InternalTopologyBuilder internalTopologyBuilder; try { @@ -138,6 +139,13 @@ public class KStreamTestDriver { } initTopology(topology, topology.stateStores()); } + + @Override + protected void after() { + if (topology != null) { + close(); + } + } private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) { for (final StateStore store : stores) { @@ -217,8 +225,8 @@ public class KStreamTestDriver { } } - context.close(); closeState(); + context.close(); } public Set<String> allProcessorNames() {
