Repository: kafka Updated Branches: refs/heads/0.10.2 eb62e5695 -> e38794e02
KAFKA-3502; move RocksDB options construction to init() In RocksDBStore, options / wOptions / fOptions are constructed in the constructor, which needs to be dismissed in the close() call; however in some tests, the generated topology is not initialized at all, and hence the corresponding state stores are supposed to not be able to be closed as well since their `init` function is not called. This could cause the above option objects to be not released. This is fixed in this patch to move the logic out of constructor and inside `init` functions, so that no RocksDB objects will be created in the constructor only. Also some minor cleanups: 1. In KStreamTestDriver.close(), we lost the logic to close the state stores but only call `flush`; it is now changed back to call both. 2. Moved the forwarding logic from KStreamTestDriver to MockProcessorContext to remove the mutual dependency: these functions should really be in ProcessorContext, not the test driver. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #2381 from guozhangwang/K3502-pure-virtual-function-unit-tests (cherry picked from commit 1974e1b0e54abe5fdebd8ff3338df864b7ab60f3) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e38794e0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e38794e0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e38794e0 Branch: refs/heads/0.10.2 Commit: e38794e020951adec5a5d0bbfe42c57294bf67bd Parents: eb62e56 Author: Guozhang Wang <wangg...@gmail.com> Authored: Tue Jan 17 20:29:55 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Jan 17 20:37:00 2017 -0800 ---------------------------------------------------------------------- .../streams/state/internals/RocksDBStore.java | 21 +--- .../streams/kstream/KStreamBuilderTest.java | 58 +++++----- .../internals/KStreamKTableLeftJoinTest.java | 2 - ...reamSessionWindowAggregateProcessorTest.java | 4 +- .../streams/state/KeyValueStoreTestDriver.java | 2 +- .../internals/CachingKeyValueStoreTest.java | 2 +- .../internals/CachingSessionStoreTest.java | 2 +- .../state/internals/CachingWindowStoreTest.java | 2 +- .../ChangeLoggingKeyValueBytesStoreTest.java | 3 +- .../ChangeLoggingKeyValueStoreTest.java | 3 +- .../ChangeLoggingSegmentedBytesStoreTest.java | 3 +- .../MeteredSegmentedBytesStoreTest.java | 3 +- .../RocksDBKeyValueStoreSupplierTest.java | 9 +- .../RocksDBSegmentedBytesStoreTest.java | 3 +- .../RocksDBSessionStoreSupplierTest.java | 9 +- .../internals/RocksDBSessionStoreTest.java | 3 +- .../RocksDBWindowStoreSupplierTest.java | 9 +- .../state/internals/RocksDBWindowStoreTest.java | 24 ++-- .../state/internals/SegmentIteratorTest.java | 27 +++-- .../streams/state/internals/SegmentsTest.java | 3 +- .../apache/kafka/test/KStreamTestDriver.java | 113 +++++-------------- .../apache/kafka/test/MockProcessorContext.java | 57 +++++++--- 22 files changed, 153 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3f8d509..55c1bb8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -94,25 +94,21 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { protected volatile boolean open = false; - - public RocksDBStore(final String name, - final Serde<K> keySerde, - final Serde<V> valueSerde) { + RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) { this(name, DB_FILE_DIR, keySerde, valueSerde); } - - public RocksDBStore(final String name, - final String parentDir, - final Serde<K> keySerde, - final Serde<V> valueSerde) { + RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) { this.name = name; this.parentDir = parentDir; this.keySerde = keySerde; this.valueSerde = valueSerde; + } + @SuppressWarnings("unchecked") + public void openDB(ProcessorContext context) { // initialize the default rocksdb options - BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); + final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); tableConfig.setBlockSize(BLOCK_SIZE); @@ -125,16 +121,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { options.setCreateIfMissing(true); options.setErrorIfExists(false); - wOptions = new WriteOptions(); wOptions.setDisableWAL(true); fOptions = new FlushOptions(); fOptions.setWaitForFlush(true); - } - @SuppressWarnings("unchecked") - public void openDB(ProcessorContext context) { final Map<String, Object> configs = context.appConfigs(); final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); if (configSetterClass != null) { @@ -464,5 +456,4 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0; } } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index c32082c..5f126c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.HashSet; @@ -41,8 +42,17 @@ import static org.junit.Assert.assertTrue; public class KStreamBuilderTest { + private static final String APP_ID = "app-id"; + + private final KStreamBuilder builder = new KStreamBuilder(); + private KStreamTestDriver driver = null; + @Before + public void setUp() { + builder.setApplicationId(APP_ID); + } + @After public void cleanup() { if (driver != null) { @@ -53,8 +63,6 @@ public class KStreamBuilderTest { @Test(expected = TopologyBuilderException.class) public void testFrom() { - final KStreamBuilder builder = new KStreamBuilder(); - builder.stream("topic-1", "topic-2"); builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3"); @@ -62,17 +70,15 @@ public class KStreamBuilderTest { @Test public void testNewName() { - KStreamBuilder builder = new KStreamBuilder(); - assertEquals("X-0000000000", builder.newName("X-")); assertEquals("Y-0000000001", builder.newName("Y-")); assertEquals("Z-0000000002", builder.newName("Z-")); - builder = new KStreamBuilder(); + KStreamBuilder newBuilder = new KStreamBuilder(); - assertEquals("X-0000000000", builder.newName("X-")); - assertEquals("Y-0000000001", builder.newName("Y-")); - assertEquals("Z-0000000002", builder.newName("Z-")); + assertEquals("X-0000000000", newBuilder.newName("X-")); + assertEquals("Y-0000000001", newBuilder.newName("Y-")); + assertEquals("Z-0000000002", newBuilder.newName("Z-")); } @Test @@ -80,8 +86,6 @@ public class KStreamBuilderTest { String topic1 = "topic-1"; String topic2 = "topic-2"; - KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> source1 = builder.stream(topic1); KStream<String, String> source2 = builder.stream(topic2); KStream<String, String> merged = builder.merge(source1, source2); @@ -105,7 +109,6 @@ public class KStreamBuilderTest { final String topic1 = "topic-1"; final String topic2 = "topic-2"; final String topic3 = "topic-3"; - final KStreamBuilder builder = new KStreamBuilder(); final KStream<String, String> source1 = builder.stream(topic1); final KStream<String, String> source2 = builder.stream(topic2); final KStream<String, String> source3 = builder.stream(topic3); @@ -131,28 +134,26 @@ public class KStreamBuilderTest { final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3); merged.groupByKey().count("my-table"); final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics(); + assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table")); } @Test(expected = TopologyBuilderException.class) public void shouldThrowExceptionWhenNoTopicPresent() throws Exception { - new KStreamBuilder().stream(); + builder.stream(); } @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception { - new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null); + builder.stream(Serdes.String(), Serdes.String(), null, null); } @Test public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("app-id"); - builder.table("topic1", "table1"); builder.table("topic2", null); - ProcessorTopology topology = builder.build(null); + final ProcessorTopology topology = builder.build(null); assertEquals(1, topology.stateStores().size()); assertEquals("table1", topology.stateStores().get(0).name()); @@ -162,23 +163,26 @@ public class KStreamBuilderTest { @Test public void shouldBuildSimpleGlobalTableTopology() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); builder.globalTable("table", "globalTable"); + final ProcessorTopology topology = builder.buildGlobalStateTopology(); final List<StateStore> stateStores = topology.globalStateStores(); - final StateStore store = stateStores.iterator().next(); + assertEquals(1, stateStores.size()); - assertEquals("globalTable", store.name()); + assertEquals("globalTable", stateStores.get(0).name()); } @Test public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); builder.globalTable("table", "globalTable"); builder.globalTable("table2", "globalTable2"); + final ProcessorTopology topology = builder.buildGlobalStateTopology(); + final List<StateStore> stateStores = topology.globalStateStores(); - assertEquals(Utils.mkSet("table", "table2"), topology.sourceTopics()); + final Set<String> sourceTopics = topology.sourceTopics(); + + assertEquals(Utils.mkSet("table", "table2"), sourceTopics); assertEquals(2, stateStores.size()); } @@ -186,7 +190,6 @@ public class KStreamBuilderTest { public void shouldAddGlobalTablesToEachGroup() throws Exception { final String one = "globalTable"; final String two = "globalTable2"; - final KStreamBuilder builder = new KStreamBuilder(); final GlobalKTable<String, String> globalTable = builder.globalTable("table", one); final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", two); @@ -203,7 +206,7 @@ public class KStreamBuilderTest { stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER); final KStream<String, String> stream2 = builder.stream("t2"); stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER); - builder.setApplicationId("app-id"); + final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups(); for (Integer groupId : nodeGroups.keySet()) { final ProcessorTopology topology = builder.build(groupId); @@ -212,6 +215,7 @@ public class KStreamBuilderTest { for (StateStore stateStore : stateStores) { names.add(stateStore.name()); } + assertEquals(2, stateStores.size()); assertTrue(names.contains(one)); assertTrue(names.contains(two)); @@ -220,9 +224,6 @@ public class KStreamBuilderTest { @Test public void shouldMapStateStoresToCorrectSourceTopics() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("app-id"); - final KStream<String, String> playEvents = builder.stream("events"); final KTable<String, String> table = builder.table("table-topic", "table-store"); @@ -230,7 +231,8 @@ public class KStreamBuilderTest { final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count"); + assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); - assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); + assertEquals(Collections.singleton(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 569ea5a..b6988e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -141,6 +141,4 @@ public class KStreamKTableLeftJoinTest { processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); } - - } http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index c3368a1..2e5b201 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; @@ -32,7 +31,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.internals.ThreadCache; -import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; @@ -91,7 +89,7 @@ public class KStreamSessionWindowAggregateProcessorTest { @Before public void initializeStore() { final File stateDir = TestUtils.tempDirectory(); - context = new MockProcessorContext(new KStreamTestDriver(new KStreamBuilder(), stateDir), stateDir, + context = new MockProcessorContext(stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) { @Override public <K, V> void forward(final K key, final V value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index c3df49d..efa0e0e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -231,7 +231,7 @@ public class KeyValueStoreTestDriver<K, V> { - this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { + this.context = new MockProcessorContext(this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { @Override public TaskId taskId() { return new TaskId(0, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 8746a86..a00526f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -63,7 +63,7 @@ public class CachingKeyValueStoreTest { store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener); cache = new ThreadCache("testCache", maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); - final MockProcessorContext context = new MockProcessorContext(null, null, null, null, (RecordCollector) null, cache); + final MockProcessorContext context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache); topic = "topic"; context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic)); store.init(context, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 5035f70..65a249e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -64,7 +64,7 @@ public class CachingSessionStoreTest { Serdes.String(), Serdes.Long()); cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); cachingStore.init(context, cachingStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 1de1002..2728aa0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -69,7 +69,7 @@ public class CachingWindowStoreTest { cachingStore.setFlushListener(cacheListener); cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; - final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic)); cachingStore.init(context, cachingStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 82fb831..99b1347 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -57,8 +57,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { sent.put(record.key(), record.value()); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), collector, http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java index 8815c5a..442602c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java @@ -65,8 +65,7 @@ public class ChangeLoggingKeyValueStoreTest { sent.put(record.key(), record.value()); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), collector, http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java index 621feb3..51f31bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java @@ -52,8 +52,7 @@ public class ChangeLoggingSegmentedBytesStoreTest { sent.put(record.key(), record.value()); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), collector, http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java index 6306512..9160a73 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java @@ -93,8 +93,7 @@ public class MeteredSegmentedBytesStoreTest { }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java index 3d9a56c..c510089 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java @@ -44,8 +44,7 @@ public class RocksDBKeyValueStoreSupplierTest { private static final String STORE_NAME = "name"; private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); - private final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new NoOpRecordCollector(), @@ -67,8 +66,7 @@ public class RocksDBKeyValueStoreSupplierTest { logged.add(record); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, @@ -89,8 +87,7 @@ public class RocksDBKeyValueStoreSupplierTest { logged.add(record); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 7fe490c..3763290 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -62,8 +62,7 @@ public class RocksDBSegmentedBytesStoreTest { new SessionKeySchema()); stateDir = TestUtils.tempDirectory(); - final MockProcessorContext context = new MockProcessorContext(null, - stateDir, + final MockProcessorContext context = new MockProcessorContext(stateDir, Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java index 28196a2..6677624 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java @@ -46,8 +46,7 @@ public class RocksDBSessionStoreSupplierTest { private static final String STORE_NAME = "name"; private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); - private final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new NoOpRecordCollector(), @@ -70,8 +69,7 @@ public class RocksDBSessionStoreSupplierTest { logged.add(record); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, @@ -92,8 +90,7 @@ public class RocksDBSessionStoreSupplierTest { logged.add(record); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 5a23a1c..e1801b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -53,8 +53,7 @@ public class RocksDBSessionStoreTest { Serdes.String(), Serdes.Long()); - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java index 897ec62..d9a0d4f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java @@ -45,8 +45,7 @@ public class RocksDBWindowStoreSupplierTest { private static final String STORE_NAME = "name"; private WindowStore<String, String> store; private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); - private final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new NoOpRecordCollector(), @@ -67,8 +66,7 @@ public class RocksDBWindowStoreSupplierTest { logged.add(record); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, @@ -89,8 +87,7 @@ public class RocksDBWindowStoreSupplierTest { logged.add(record); } }; - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 79223de..ee846f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -100,7 +100,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -151,7 +151,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -225,7 +225,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -314,7 +314,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -401,7 +401,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -457,7 +457,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -485,7 +485,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -613,7 +613,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -662,7 +662,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -710,7 +710,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -810,7 +810,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); @@ -870,7 +870,7 @@ public class RocksDBWindowStoreTest { }; MockProcessorContext context = new MockProcessorContext( - null, baseDir, + baseDir, byteArraySerde, byteArraySerde, recordCollector, cache); http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 3d2da31..ae6fb5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -48,14 +48,16 @@ public class SegmentIteratorTest { } }; + private SegmentIterator iterator = null; + @Before public void before() { - final MockProcessorContext context = new MockProcessorContext(null, - TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + final MockProcessorContext context = new MockProcessorContext( + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + new NoOpRecordCollector(), + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); segmentOne.openDB(context); segmentTwo.openDB(context); segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes()); @@ -67,13 +69,17 @@ public class SegmentIteratorTest { @After public void closeSegments() { + if (iterator != null) { + iterator.close(); + iterator = null; + } segmentOne.close(); segmentTwo.close(); } @Test public void shouldIterateOverAllSegments() throws Exception { - final SegmentIterator iterator = new SegmentIterator( + iterator = new SegmentIterator( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, @@ -101,7 +107,7 @@ public class SegmentIteratorTest { @Test public void shouldOnlyIterateOverSegmentsInRange() throws Exception { - final SegmentIterator iterator = new SegmentIterator( + iterator = new SegmentIterator( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, @@ -121,7 +127,7 @@ public class SegmentIteratorTest { @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() throws Exception { - final SegmentIterator iterator = new SegmentIterator( + iterator = new SegmentIterator( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, @@ -133,7 +139,7 @@ public class SegmentIteratorTest { @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementOnNextIfNoNext() throws Exception { - final SegmentIterator iterator = new SegmentIterator( + iterator = new SegmentIterator( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, @@ -146,5 +152,4 @@ public class SegmentIteratorTest { private KeyValue<String, String> toStringKeyValue(final KeyValue<Bytes, byte[]> binaryKv) { return KeyValue.pair(new String(binaryKv.key.get()), new String(binaryKv.value)); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index 47207ec..9e34e63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -43,8 +43,7 @@ public class SegmentsTest { @Before public void createContext() { - context = new MockProcessorContext(null, - TestUtils.tempDirectory(), + context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index d51384c..207705c 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -42,14 +42,11 @@ import java.util.Set; public class KStreamTestDriver { + private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; + private final ProcessorTopology topology; private final MockProcessorContext context; private final ProcessorTopology globalTopology; - private ThreadCache cache; - private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; - public final File stateDir; - - private ProcessorNode currNode; public KStreamTestDriver(KStreamBuilder builder) { this(builder, null, Serdes.ByteArray(), Serdes.ByteArray()); @@ -78,9 +75,8 @@ public class KStreamTestDriver { builder.setApplicationId("TestDriver"); this.topology = builder.build(null); this.globalTopology = builder.buildGlobalStateTopology(); - this.stateDir = stateDir; - this.cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics())); - this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector(), cache); + ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics())); + this.context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache); this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic")); // init global topology first as it will add stores to the // store map that are required for joins etc. @@ -88,7 +84,6 @@ public class KStreamTestDriver { initTopology(globalTopology, globalTopology.globalStateStores()); } initTopology(topology, topology.stateStores()); - } private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) { @@ -106,14 +101,17 @@ public class KStreamTestDriver { } } + public ProcessorTopology topology() { + return topology; + } public ProcessorContext context() { return context; } public void process(String topicName, Object key, Object value) { - final ProcessorNode previous = currNode; - currNode = topology.source(topicName); + final ProcessorNode prevNode = context.currentNode(); + ProcessorNode currNode = topology.source(topicName); if (currNode == null && globalTopology != null) { currNode = globalTopology.source(topicName); } @@ -121,33 +119,27 @@ public class KStreamTestDriver { // if currNode is null, check if this topic is a changelog topic; // if yes, skip if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { - currNode = previous; return; } context.setRecordContext(createRecordContext(context.timestamp())); context.setCurrentNode(currNode); try { - forward(key, value); + context.forward(key, value); } finally { - currNode = null; - context.setCurrentNode(null); + context.setCurrentNode(prevNode); } } - private ProcessorRecordContext createRecordContext(long timestamp) { - return new ProcessorRecordContext(timestamp, -1, -1, "topic"); - } - - public void punctuate(long timestamp) { + final ProcessorNode prevNode = context.currentNode(); for (ProcessorNode processor : topology.processors()) { if (processor.processor() != null) { - currNode = processor; + context.setRecordContext(createRecordContext(timestamp)); + context.setCurrentNode(processor); try { - context.setRecordContext(createRecordContext(timestamp)); processor.processor().punctuate(timestamp); } finally { - currNode = null; + context.setCurrentNode(prevNode); } } } @@ -157,59 +149,18 @@ public class KStreamTestDriver { context.setTime(timestamp); } - @SuppressWarnings("unchecked") - public <K, V> void forward(K key, V value) { - ProcessorNode thisNode = currNode; - for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) currNode.children()) { - currNode = childNode; - try { - childNode.process(key, value); - } finally { - currNode = thisNode; - } - } - } - - @SuppressWarnings("unchecked") - public <K, V> void forward(K key, V value, int childIndex) { - ProcessorNode thisNode = currNode; - ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex); - currNode = childNode; - try { - childNode.process(key, value); - } finally { - currNode = thisNode; - } - } - - @SuppressWarnings("unchecked") - public <K, V> void forward(K key, V value, String childName) { - ProcessorNode thisNode = currNode; - for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { - if (childNode.name().equals(childName)) { - currNode = childNode; - try { - childNode.process(key, value); - } finally { - currNode = thisNode; - } - break; - } - } - } - public void close() { // close all processors for (ProcessorNode node : topology.processors()) { - currNode = node; + context.setCurrentNode(node); try { node.close(); } finally { - currNode = null; + context.setCurrentNode(null); } } - flushState(); + closeState(); } public Set<String> allProcessorNames() { @@ -245,24 +196,19 @@ public class KStreamTestDriver { } } - public void setCurrentNode(final ProcessorNode currentNode) { - currNode = currentNode; - } - - public StateStore globalStateStore(final String storeName) { - if (globalTopology != null) { - for (final StateStore store : globalTopology.globalStateStores()) { - if (store.name().equals(storeName)) { - return store; - } - } + private void closeState() { + for (StateStore stateStore : context.allStateStores().values()) { + stateStore.flush(); + stateStore.close(); } - return null; } + private ProcessorRecordContext createRecordContext(long timestamp) { + return new ProcessorRecordContext(timestamp, -1, -1, "topic"); + } private class MockRecordCollector extends RecordCollectorImpl { - public MockRecordCollector() { + MockRecordCollector() { super(null, "KStreamTestDriver"); } @@ -280,12 +226,9 @@ public class KStreamTestDriver { } @Override - public void flush() { - } + public void flush() {} @Override - public void close() { - } + public void close() {} } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index 5ae7112..93f0f42 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -18,7 +18,6 @@ package org.apache.kafka.test; import java.io.File; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -48,7 +47,6 @@ import java.util.LinkedHashMap; public class MockProcessorContext implements InternalProcessorContext, RecordCollector.Supplier { - private final KStreamTestDriver driver; private final Serde<?> keySerde; private final Serde<?> valSerde; private final RecordCollector.Supplier recordCollectorSupplier; @@ -67,34 +65,34 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol private ProcessorNode currentNode; public MockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector) { - this(null, null, serdes.keySerde(), serdes.valueSerde(), collector, null); + this(null, serdes.keySerde(), serdes.valueSerde(), collector, null); } - public MockProcessorContext(KStreamTestDriver driver, File stateDir, + public MockProcessorContext(File stateDir, Serde<?> keySerde, Serde<?> valSerde, final RecordCollector collector, final ThreadCache cache) { - this(driver, stateDir, keySerde, valSerde, + this(stateDir, keySerde, valSerde, new RecordCollector.Supplier() { @Override public RecordCollector recordCollector() { return collector; } - }, cache); + }, + cache); } - public MockProcessorContext(KStreamTestDriver driver, File stateDir, - Serde<?> keySerde, - Serde<?> valSerde, - RecordCollector.Supplier collectorSupplier, + public MockProcessorContext(final File stateDir, + final Serde<?> keySerde, + final Serde<?> valSerde, + final RecordCollector.Supplier collectorSupplier, final ThreadCache cache) { - this.driver = driver; this.stateDir = stateDir; this.keySerde = keySerde; this.valSerde = valSerde; this.recordCollectorSupplier = collectorSupplier; - this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true); + this.metrics = new Metrics(config, Collections.singletonList((MetricsReporter) new JmxReporter()), time, true); this.cache = cache; this.streamsMetrics = new MockStreamsMetrics(metrics); } @@ -182,19 +180,45 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol @Override @SuppressWarnings("unchecked") public <K, V> void forward(K key, V value) { - driver.forward(key, value); + ProcessorNode thisNode = currentNode; + for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + currentNode = childNode; + try { + childNode.process(key, value); + } finally { + currentNode = thisNode; + } + } } @Override @SuppressWarnings("unchecked") public <K, V> void forward(K key, V value, int childIndex) { - driver.forward(key, value, childIndex); + ProcessorNode thisNode = currentNode; + ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex); + currentNode = childNode; + try { + childNode.process(key, value); + } finally { + currentNode = thisNode; + } } @Override @SuppressWarnings("unchecked") public <K, V> void forward(K key, V value, String childName) { - driver.forward(key, value, childName); + ProcessorNode thisNode = currentNode; + for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + if (childNode.name().equals(childName)) { + currentNode = childNode; + try { + childNode.process(key, value); + } finally { + currentNode = thisNode; + } + break; + } + } } @@ -268,8 +292,7 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol @Override public void setCurrentNode(final ProcessorNode currentNode) { - this.currentNode = currentNode; - driver.setCurrentNode(currentNode); + this.currentNode = currentNode; } @Override