Repository: kafka Updated Branches: refs/heads/trunk 933a7506e -> 1ccab26a3
KAFKA-3911: KTable source materialization Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #1638 from enothereska/KAFKA-3911-ktable-materialization Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ccab26a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ccab26a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ccab26a Branch: refs/heads/trunk Commit: 1ccab26a325e6ee23396049a24a5b6eb4b7a1c8e Parents: 933a750 Author: Eno Thereska <[email protected]> Authored: Thu Jul 21 14:44:59 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Jul 21 14:44:59 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/KStreamBuilder.java | 14 +++++++---- .../streams/kstream/internals/KTableImpl.java | 11 ++++++--- .../streams/processor/TopologyBuilder.java | 16 ++++++++++++- .../processor/internals/AbstractTask.java | 3 ++- .../internals/ProcessorStateManager.java | 25 ++++++++++++++------ .../processor/internals/ProcessorTopology.java | 19 +++++++-------- .../kstream/internals/KTableFilterTest.java | 2 +- .../kstream/internals/KTableForeachTest.java | 13 +++++++++- .../kstream/internals/KTableImplTest.java | 6 ++--- .../kstream/internals/KTableMapKeysTest.java | 12 +++++++++- .../kstream/internals/KTableSourceTest.java | 2 +- .../internals/ProcessorStateManagerTest.java | 12 +++++----- .../processor/internals/StandbyTaskTest.java | 10 ++++++-- .../processor/internals/StreamTaskTest.java | 3 ++- 14 files changed, 104 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 2df1bcb..08e9842 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -144,14 +144,18 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KTable} for the specified topics */ public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) { - String source = newName(KStreamImpl.SOURCE_NAME); - String name = newName(KTableImpl.SOURCE_NAME); + final String source = newName(KStreamImpl.SOURCE_NAME); + final String name = newName(KTableImpl.SOURCE_NAME); + final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName); addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); - - ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName); addProcessor(name, processorSupplier, source); - return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName); + + final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName); + kTable.materialize((KTableSource) processorSupplier); + connectSourceStoreAndTopic(storeName, topic); + + return kTable; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 17f4716..f4d4855 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.errors.StreamsException; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -364,7 +365,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, KTableValueGetterSupplier<K, V> valueGetterSupplier() { if (processorSupplier instanceof KTableSource) { KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier; - materialize(source); + if (!source.isMaterialized()) { + throw new StreamsException("Source is not materialized"); + } return new KTableSourceValueGetterSupplier<>(source.storeName); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); @@ -378,7 +381,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier; - materialize(source); + if (!source.isMaterialized()) { + throw new StreamsException("Source is not materialized"); + } source.enableSendingOldValues(); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues(); @@ -393,7 +398,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return sendOldValues; } - private void materialize(KTableSource<K, ?> source) { + public void materialize(KTableSource<K, ?> source) { synchronized (source) { if (!source.isMaterialized()) { StateStoreSupplier storeSupplier = http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index a28b270..b8851b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -67,6 +67,7 @@ public class TopologyBuilder { private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>(); private final HashMap<String, Pattern> topicToPatterns = new HashMap<>(); private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); + private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>(); private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); private String applicationId; @@ -536,6 +537,19 @@ public class TopologyBuilder { return this; } + protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) { + if (sourceStoreToSourceTopic != null) { + if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) { + throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); + } + sourceStoreToSourceTopic.put(sourceStoreName, topic); + } else { + throw new TopologyBuilderException("sourceStoreToSourceTopic is null"); + } + + return this; + } + /** * Connects a list of processors. * @@ -841,7 +855,7 @@ public class TopologyBuilder { } } - return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values())); + return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), sourceStoreToSourceTopic); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 70070a9..8a45dd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -61,7 +61,8 @@ public abstract class AbstractTask { // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory); + this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic()); + } catch (IOException e) { throw new ProcessorStateException("Error while creating the state manager", e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 883959e..11c61a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -61,13 +61,15 @@ public class ProcessorStateManager { private final Map<TopicPartition, Long> offsetLimits; private final boolean isStandby; private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name + private final Map<String, String> sourceStoreToSourceTopic; private final TaskId taskId; private final StateDirectory stateDirectory; /** * @throws IOException if any error happens while creating or locking the state directory */ - public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, StateDirectory stateDirectory) throws IOException { + public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, + StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic) throws IOException { this.applicationId = applicationId; this.defaultPartition = taskId.partition; this.taskId = taskId; @@ -84,6 +86,7 @@ public class ProcessorStateManager { this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null; this.offsetLimits = new HashMap<>(); this.baseDir = stateDirectory.directoryForTask(taskId); + this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; if (!stateDirectory.lock(taskId, 5)) { throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath()); @@ -112,20 +115,28 @@ public class ProcessorStateManager { * @throws StreamsException if the store's change log does not contain the partition */ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { - if (store.name().equals(CHECKPOINT_FILE_NAME)) + + if (store.name().equals(CHECKPOINT_FILE_NAME)) { throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME); + } - if (this.stores.containsKey(store.name())) + if (this.stores.containsKey(store.name())) { throw new IllegalArgumentException("Store " + store.name() + " has already been registered."); + } - if (loggingEnabled) + if (loggingEnabled) { this.loggingEnabled.add(store.name()); - + } + // check that the underlying change log topic exist or not String topic; - if (loggingEnabled) + if (loggingEnabled) { topic = storeChangelogTopic(this.applicationId, store.name()); - else topic = store.name(); + } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) { + topic = sourceStoreToSourceTopic.get(store.name()); + } else { + throw new IllegalArgumentException("Store is neither built from source topic, nor has a changelog."); + } // block until the partition is ready for this state changelog topic or time has elapsed int partition = getPartition(topic); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 0316446..221d152 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.Set; public class ProcessorTopology { @@ -30,23 +29,17 @@ public class ProcessorTopology { private final Map<String, SourceNode> sourceByTopics; private final Map<String, SinkNode> sinkByTopics; private final List<StateStoreSupplier> stateStoreSuppliers; - private final Map<String, String> sinkNameToTopic; - + private final Map<String, String> sourceStoreToSourceTopic; public ProcessorTopology(List<ProcessorNode> processorNodes, Map<String, SourceNode> sourceByTopics, Map<String, SinkNode> sinkByTopics, - List<StateStoreSupplier> stateStoreSuppliers) { + List<StateStoreSupplier> stateStoreSuppliers, + Map<String, String> sourceStoreToSourceTopic) { this.processorNodes = Collections.unmodifiableList(processorNodes); this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics); this.sinkByTopics = Collections.unmodifiableMap(sinkByTopics); this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers); - - // pre-process sink nodes to get reverse mapping - sinkNameToTopic = new HashMap<>(); - for (String topic : sinkByTopics.keySet()) { - SinkNode sink = sinkByTopics.get(topic); - sinkNameToTopic.put(sink.name(), topic); - } + this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; } public Set<String> sourceTopics() { @@ -81,6 +74,10 @@ public class ProcessorTopology { return stateStoreSuppliers; } + public Map<String, String> sourceStoreToSourceTopic() { + return sourceStoreToSourceTopic; + } + private String childrenToString(List<ProcessorNode<?, ?>> children) { if (children == null || children.isEmpty()) { return ""; http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index d8dee30..6837c56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -84,7 +84,7 @@ public class KTableFilterTest { table2.toStream().process(proc2); table3.toStream().process(proc3); - driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", 1); driver.process(topic1, "B", 2); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java index e0cb190..6fbce82 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -24,23 +24,34 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.List; import java.util.Locale; import java.util.ArrayList; import java.util.Arrays; +import java.io.File; +import java.io.IOException; + import static org.junit.Assert.assertEquals; public class KTableForeachTest { final private String topicName = "topic"; + private File stateDir = null; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); private KStreamTestDriver driver; + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } + @After public void cleanup() { if (driver != null) { @@ -81,7 +92,7 @@ public class KTableForeachTest { table.foreach(action); // Then - driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder, stateDir); for (KeyValue<Integer, String> record: inputRecords) { driver.process(topicName, record.key, record.value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 6794bb4..617a2a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -105,7 +105,7 @@ public class KTableImplTest { MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>(); table4.toStream().process(proc4); - driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "02"); @@ -273,8 +273,8 @@ public class KTableImplTest { driver = new KStreamTestDriver(builder, stateDir, null, null); driver.setTime(0L); - // no state stores should be created - assertEquals(0, driver.allStateStores().size()); + // two state stores should be created + assertEquals(2, driver.allStateStores().size()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index 7666438..78cff18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -26,10 +26,14 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -39,6 +43,7 @@ public class KTableMapKeysTest { final private Serde<String> stringSerde = new Serdes.StringSerde(); final private Serde<Integer> integerSerde = new Serdes.IntegerSerde(); + private File stateDir = null; private KStreamTestDriver driver = null; @@ -50,6 +55,11 @@ public class KTableMapKeysTest { driver = null; } + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } + @Test public void testMapKeysConvertingToStream() { final KStreamBuilder builder = new KStreamBuilder(); @@ -82,7 +92,7 @@ public class KTableMapKeysTest { convertedStream.process(processor); - driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder, stateDir); for (int i = 0; i < originalKeys.length; i++) { driver.process(topic1, originalKeys[i], values[i]); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index ad3f02c..cd1262b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -68,7 +68,7 @@ public class KTableSourceTest { MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); - driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", 1); driver.process(topic1, "B", 2); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index fbc92d2..32dce6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -209,7 +209,7 @@ public class ProcessorStateManagerTest { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { @@ -237,7 +237,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null); try { restoreConsumer.reset(); @@ -286,7 +286,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, null); try { restoreConsumer.reset(); @@ -359,7 +359,7 @@ public class ProcessorStateManagerTest { // if there is an source partition, inherit the partition id Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null); // standby try { restoreConsumer.reset(); @@ -393,7 +393,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, null); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -431,7 +431,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index f3339a8..11058c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -77,7 +77,8 @@ public class StandbyTaskTest { Utils.<StateStoreSupplier>mkList( new MockStateStoreSupplier(storeName1, false), new MockStateStoreSupplier(storeName2, true) - ) + ), + Collections.<String, String>emptyMap() ); private final TopicPartition ktable = new TopicPartition("ktable1", 0); @@ -88,7 +89,12 @@ public class StandbyTaskTest { Collections.<String, SinkNode>emptyMap(), Utils.<StateStoreSupplier>mkList( new MockStateStoreSupplier(ktable.topic(), true, false) - ) + ), + new HashMap<String, String>() { + { + put("ktable1", ktable.topic()); + } + } ); private File baseDir; private StateDirectory stateDirectory; http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index fdcf6b8..32d6aa4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -75,7 +75,8 @@ public class StreamTaskTest { } }, Collections.<String, SinkNode>emptyMap(), - Collections.<StateStoreSupplier>emptyList() + Collections.<StateStoreSupplier>emptyList(), + Collections.<String, String>emptyMap() ); private File baseDir; private StateDirectory stateDirectory;
