Repository: kafka Updated Branches: refs/heads/trunk 999108667 -> 933a7506e
KAFKA-3858: Add functions to print stream topologies Author: Eno Thereska <[email protected]> Reviewers: Roger Hoover, Matthias J. Sax, Guozhang Wang Closes #1619 from enothereska/KAFKA-3858-print-topology Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/933a7506 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/933a7506 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/933a7506 Branch: refs/heads/trunk Commit: 933a7506efb57add31a1e9dbd984c1230128b855 Parents: 9991086 Author: Eno Thereska <[email protected]> Authored: Thu Jul 21 13:11:53 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Jul 21 13:11:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 16 +++++ .../streams/processor/TopologyBuilder.java | 15 ++++- .../processor/internals/AbstractTask.java | 27 +++++++++ .../processor/internals/ProcessorNode.java | 17 ++++++ .../processor/internals/ProcessorTopology.java | 62 +++++++++++++++++++- .../streams/processor/internals/SinkNode.java | 9 +++ .../streams/processor/internals/SourceNode.java | 19 +++++- .../processor/internals/StandbyTask.java | 9 +++ .../streams/processor/internals/StreamTask.java | 8 +++ .../processor/internals/StreamThread.java | 33 +++++++++++ .../processor/internals/PartitionGroupTest.java | 9 +-- .../processor/internals/RecordQueueTest.java | 3 +- .../processor/internals/StandbyTaskTest.java | 2 + .../processor/internals/StreamTaskTest.java | 16 ++--- .../org/apache/kafka/test/MockSourceNode.java | 4 +- 15 files changed, 228 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 0ed0b6c..8f8cfa7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -219,6 +219,22 @@ public class KafkaStreams { } /** + * Produces a string representation contain useful information about Kafka Streams + * Such as thread IDs, task IDs and a representation of the topology. This is useful + * in debugging scenarios. + * @return A string representation of the Kafka Streams instance. + */ + public String toString() { + StringBuilder sb = new StringBuilder("KafkaStreams processID:" + this.processId + "\n"); + for (int i = 0; i < this.threads.length; i++) { + sb.append("\t" + this.threads[i].toString()); + } + sb.append("\n"); + + return sb.toString(); + } + + /** * Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception. * * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler. http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 2c02b0c..a28b270 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -157,7 +157,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override public ProcessorNode build(String applicationId) { - return new SourceNode(name, keyDeserializer, valDeserializer); + return new SourceNode(name, nodeToSourceTopics.get(name), keyDeserializer, valDeserializer); } private boolean isMatch(String topic) { @@ -771,6 +771,7 @@ public class TopologyBuilder { return topicNames; } + /** * Build the topology for the specified topic group. This is called automatically when passing this builder into the * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor. @@ -793,6 +794,7 @@ public class TopologyBuilder { List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); Map<String, ProcessorNode> processorMap = new HashMap<>(); Map<String, SourceNode> topicSourceMap = new HashMap<>(); + Map<String, SinkNode> topicSinkMap = new HashMap<>(); Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) @@ -823,8 +825,15 @@ public class TopologyBuilder { } } } else if (factory instanceof SinkNodeFactory) { - for (String parent : ((SinkNodeFactory) factory).parents) { + SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory; + for (String parent : sinkNodeFactory.parents) { processorMap.get(parent).addChild(node); + if (internalTopicNames.contains(sinkNodeFactory.topic)) { + // prefix the internal topic name with the application id + topicSinkMap.put(applicationId + "-" + sinkNodeFactory.topic, (SinkNode) node); + } else { + topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node); + } } } else { throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName()); @@ -832,7 +841,7 @@ public class TopologyBuilder { } } - return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values())); + return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values())); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 488fc0f..70070a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -124,4 +124,31 @@ public abstract class AbstractTask { public StateStore getStore(final String name) { return stateMgr.getStore(name); } + + /** + * Produces a string representation contain useful information about a StreamTask. + * This is useful in debugging scenarios. + * @return A string representation of the StreamTask instance. + */ + public String toString() { + StringBuilder sb = new StringBuilder("StreamsTask taskId:" + this.id() + "\n"); + + // print topology + if (topology != null) { + sb.append("\t\t\t" + topology.toString()); + } + + // print assigned partitions + if (partitions != null && !partitions.isEmpty()) { + sb.append("\t\t\tPartitions ["); + for (TopicPartition topicPartition : partitions) { + sb.append(topicPartition.toString() + ","); + } + sb.setLength(sb.length() - 1); + sb.append("]"); + } + + sb.append("\n"); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 50e3a0b..64ca032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -71,4 +71,21 @@ public class ProcessorNode<K, V> { public void close() { processor.close(); } + + /** + * @return a string representation of this node, useful for debugging. + */ + public String toString() { + StringBuilder sb = new StringBuilder(""); + sb.append(name + ": "); + if (stateStores != null && !stateStores.isEmpty()) { + sb.append("stateStores ["); + for (String store : (Set<String>) stateStores) { + sb.append(store + ","); + } + sb.setLength(sb.length() - 1); + sb.append("] "); + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index a70aa70..0316446 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -16,27 +16,37 @@ */ package org.apache.kafka.streams.processor.internals; - import org.apache.kafka.streams.processor.StateStoreSupplier; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Set; - public class ProcessorTopology { private final List<ProcessorNode> processorNodes; private final Map<String, SourceNode> sourceByTopics; + private final Map<String, SinkNode> sinkByTopics; private final List<StateStoreSupplier> stateStoreSuppliers; + private final Map<String, String> sinkNameToTopic; public ProcessorTopology(List<ProcessorNode> processorNodes, Map<String, SourceNode> sourceByTopics, + Map<String, SinkNode> sinkByTopics, List<StateStoreSupplier> stateStoreSuppliers) { this.processorNodes = Collections.unmodifiableList(processorNodes); this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics); + this.sinkByTopics = Collections.unmodifiableMap(sinkByTopics); this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers); + + // pre-process sink nodes to get reverse mapping + sinkNameToTopic = new HashMap<>(); + for (String topic : sinkByTopics.keySet()) { + SinkNode sink = sinkByTopics.get(topic); + sinkNameToTopic.put(sink.name(), topic); + } } public Set<String> sourceTopics() { @@ -51,6 +61,18 @@ public class ProcessorTopology { return new HashSet<>(sourceByTopics.values()); } + public Set<String> sinkTopics() { + return sinkByTopics.keySet(); + } + + public SinkNode sink(String topic) { + return sinkByTopics.get(topic); + } + + public Set<SinkNode> sinks() { + return new HashSet<>(sinkByTopics.values()); + } + public List<ProcessorNode> processors() { return processorNodes; } @@ -59,4 +81,40 @@ public class ProcessorTopology { return stateStoreSuppliers; } + private String childrenToString(List<ProcessorNode<?, ?>> children) { + if (children == null || children.isEmpty()) { + return ""; + } + + StringBuilder sb = new StringBuilder("children ["); + for (ProcessorNode child : children) { + sb.append(child.name() + ","); + } + sb.setLength(sb.length() - 1); + sb.append("]\n"); + + // recursively print children + for (ProcessorNode child : children) { + sb.append("\t\t\t\t" + child.toString()); + sb.append(childrenToString(child.children())); + } + return sb.toString(); + } + + /** + * Produces a string representation contain useful information this topology. + * This is useful in debugging scenarios. + * @return A string representation of this instance. + */ + public String toString() { + StringBuilder sb = new StringBuilder("ProcessorTopology:\n"); + + // start from sources + for (SourceNode source : sourceByTopics.values()) { + sb.append("\t\t\t\t" + source.toString()); + sb.append(childrenToString(source.children())); + sb.append("\n"); + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 3795916..6907858 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -81,4 +81,13 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { public Serializer<V> valueSerializer() { return valSerializer; } + + /** + * @return a string representation of this node, useful for debugging. + */ + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append("topic:" + topic); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index a550344..90da1de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -26,10 +26,11 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { private Deserializer<K> keyDeserializer; private Deserializer<V> valDeserializer; private ProcessorContext context; + private String[] topics; - public SourceNode(String name, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { + public SourceNode(String name, String[] topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { super(name); - + this.topics = topics; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; } @@ -73,4 +74,18 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { public Deserializer<V> valueDeserializer() { return valDeserializer; } + + /** + * @return a string representation of this node, useful for debugging. + */ + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append("topics: ["); + for (String topic : topics) { + sb.append(topic + ","); + } + sb.setLength(sb.length() - 1); + sb.append("] "); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 830fab6..08b4f07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -90,4 +90,13 @@ public class StandbyTask extends AbstractTask { // reinitialize offset limits initializeOffsetLimits(); } + + /** + * Produces a string representation contain useful information about a StreamTask. + * This is useful in debugging scenarios. + * @return A string representation of the StreamTask instance. + */ + public String toString() { + return super.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 402c8fd..3126dd4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -376,4 +376,12 @@ public class StreamTask extends AbstractTask implements Punctuator { } } + /** + * Produces a string representation contain useful information about a StreamTask. + * This is useful in debugging scenarios. + * @return A string representation of the StreamTask instance. + */ + public String toString() { + return super.toString(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f982efa..c84cae0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -621,6 +621,39 @@ public class StreamThread extends Thread { } + /** + * Produces a string representation contain useful information about a StreamThread. + * This is useful in debugging scenarios. + * @return A string representation of the StreamThread instance. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder("StreamsThread appId:" + this.applicationId + "\n"); + sb.append("\tStreamsThread clientId:" + clientId + "\n"); + + // iterate and print active tasks + if (activeTasks != null) { + sb.append("\tActive tasks:\n"); + for (TaskId tId : activeTasks.keySet()) { + StreamTask task = activeTasks.get(tId); + sb.append("\t\t" + task.toString()); + } + } + + // iterate and print standby tasks + if (standbyTasks != null) { + sb.append("\tStandby tasks:\n"); + for (TaskId tId : standbyTasks.keySet()) { + StandbyTask task = standbyTasks.get(tId); + sb.append("\t\t" + task.toString()); + } + sb.append("\n"); + } + + return sb.toString(); + } + + private void removeStandbyTasks() { try { for (StandbyTask task : standbyTasks.values()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index a1c07af..f8c080c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -38,10 +38,11 @@ public class PartitionGroupTest { private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); - private final TopicPartition partition1 = new TopicPartition("topic", 1); - private final TopicPartition partition2 = new TopicPartition("topic", 2); - private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(intDeserializer, intDeserializer)); - private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(intDeserializer, intDeserializer)); + private final String[] topics = {"topic"}; + private final TopicPartition partition1 = new TopicPartition(topics[0], 1); + private final TopicPartition partition2 = new TopicPartition(topics[0], 2); + private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer)); + private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer)); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 8d9c91c..7870611 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -39,7 +39,8 @@ public class RecordQueueTest { private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); - private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), new MockSourceNode<>(intDeserializer, intDeserializer)); + private final String[] topics = {"topic"}; + private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), new MockSourceNode<>(topics, intDeserializer, intDeserializer)); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 39cb0a0..f3339a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -73,6 +73,7 @@ public class StandbyTaskTest { private final ProcessorTopology topology = new ProcessorTopology( Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), + Collections.<String, SinkNode>emptyMap(), Utils.<StateStoreSupplier>mkList( new MockStateStoreSupplier(storeName1, false), new MockStateStoreSupplier(storeName2, true) @@ -84,6 +85,7 @@ public class StandbyTaskTest { private final ProcessorTopology ktableTopology = new ProcessorTopology( Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), + Collections.<String, SinkNode>emptyMap(), Utils.<StateStoreSupplier>mkList( new MockStateStoreSupplier(ktable.topic(), true, false) ) http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index e3f00ff..fdcf6b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -56,23 +56,25 @@ public class StreamTaskTest { private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); - - private final TopicPartition partition1 = new TopicPartition("topic1", 1); - private final TopicPartition partition2 = new TopicPartition("topic2", 1); + private final String[] topic1 = {"topic1"}; + private final String[] topic2 = {"topic2"}; + private final TopicPartition partition1 = new TopicPartition(topic1[0], 1); + private final TopicPartition partition2 = new TopicPartition(topic2[0], 1); private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2); - private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer); private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10L); private final ProcessorTopology topology = new ProcessorTopology( Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode) processor), new HashMap<String, SourceNode>() { { - put("topic1", source1); - put("topic2", source2); + put(topic1[0], source1); + put(topic2[0], source2); } }, + Collections.<String, SinkNode>emptyMap(), Collections.<StateStoreSupplier>emptyList() ); private File baseDir; http://git-wip-us.apache.org/repos/asf/kafka/blob/933a7506/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java index cf0202e..176501a 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java @@ -33,8 +33,8 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> { public final ArrayList<K> keys = new ArrayList<>(); public final ArrayList<V> values = new ArrayList<>(); - public MockSourceNode(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { - super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer); + public MockSourceNode(String[] topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { + super(NAME + INDEX.getAndIncrement(), topics, keyDeserializer, valDeserializer); } @Override
