Repository: kafka Updated Branches: refs/heads/trunk 24fd025d4 -> a2bac70a6
MINOR: Refactor TopologyBuilder with ApplicationID Prefix Author: Guozhang Wang <[email protected]> Reviewers: Damian Guy <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1736 from guozhangwang/Kminor-topology-applicationID Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2bac70a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2bac70a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2bac70a Branch: refs/heads/trunk Commit: a2bac70a6634e9a78734d23158fb7e45f290ea26 Parents: 24fd025 Author: Guozhang Wang <[email protected]> Authored: Mon Aug 22 21:33:47 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Aug 22 21:33:47 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/KStreamBuilder.java | 1 - .../streams/processor/TopologyBuilder.java | 393 ++++++++++--------- .../processor/internals/StreamThread.java | 4 +- .../kstream/internals/KStreamBranchTest.java | 1 + .../kstream/internals/KStreamImplTest.java | 2 +- .../streams/processor/TopologyBuilderTest.java | 29 +- .../internals/ProcessorTopologyTest.java | 4 +- .../processor/internals/StreamThreadTest.java | 14 +- .../StreamThreadStateStoreProviderTest.java | 3 +- .../apache/kafka/test/KStreamTestDriver.java | 3 +- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- 11 files changed, 232 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 08e9842..f9544cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -92,7 +92,6 @@ public class KStreamBuilder extends TopologyBuilder { return new KStreamImpl<>(this, name, Collections.singleton(name), false); } - /** * Create a {@link KStream} instance from the specified Pattern. * <p> http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 7b79236..bcdb54a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -60,23 +60,45 @@ public class TopologyBuilder { // state factories private final Map<String, StateStoreFactory> stateFactories = new HashMap<>(); + // all topics subscribed from source processors (without application-id prefix for internal topics) private final Set<String> sourceTopicNames = new HashSet<>(); + + // all internal topics auto-created by the topology builder and used in source / sink processors private final Set<String> internalTopicNames = new HashSet<>(); - private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); + + // groups of source processors that need to be copartitioned private final List<Set<String>> copartitionSourceGroups = new ArrayList<>(); + + // map from source processor names to subscribed topics (without application-id prefix for internal topics) private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>(); + + // map from source processor names to regex subscription patterns private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>(); - private final HashMap<String, Pattern> topicToPatterns = new HashMap<>(); + + // map from sink processor names to subscribed topic (without application-id prefix for internal topics) private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); + + // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node + // even if it can be matched by multiple regex patterns + private final HashMap<String, Pattern> topicToPatterns = new HashMap<>(); + + // map from state store names to all the topics subscribed from source processors that + // are connected to these state stores private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>(); + + // map from state store names that are directly associated with source processors to their subscribed topics, + // this is used in the extended KStreamBuilder. private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>(); + + private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); + private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); - private String applicationId; - private Map<Integer, Set<String>> nodeGroups = null; - private Pattern topicPattern; + private String applicationId = null; + private Pattern topicPattern = null; + private Map<Integer, Set<String>> nodeGroups = null; private static class StateStoreFactory { public final Set<String> users; @@ -98,7 +120,7 @@ public class TopologyBuilder { this.name = name; } - public abstract ProcessorNode build(String applicationId); + public abstract ProcessorNode build(); } private static class ProcessorNodeFactory extends NodeFactory { @@ -118,7 +140,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String applicationId) { + public ProcessorNode build() { return new ProcessorNode(name, supplier.get(), stateStoreNames); } } @@ -146,9 +168,12 @@ public class TopologyBuilder { for (String update : subscribedTopics) { if (this.pattern == topicToPatterns.get(update)) { matchedTopics.add(update); - //not same pattern instance,but still matches not allowed } else if (topicToPatterns.containsKey(update) && isMatch(update)) { - throw new TopologyBuilderException("Topic " + update + " already matched check for overlapping regex patterns"); + // the same topic cannot be matched to more than one pattern + // TODO: we should lift this requirement in the future + throw new TopologyBuilderException("Topic " + update + + " is already matched for another regex pattern " + topicToPatterns.get(update) + + " and hence cannot be matched to this regex pattern " + pattern + " any more."); } else if (isMatch(update)) { topicToPatterns.put(update, this.pattern); matchedTopics.add(update); @@ -159,7 +184,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String applicationId) { + public ProcessorNode build() { return new SourceNode(name, nodeToSourceTopics.get(name), keyDeserializer, valDeserializer); } @@ -186,10 +211,10 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String applicationId) { + public ProcessorNode build() { if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner); + return new SinkNode(name, decorateTopic(topic), keySerializer, valSerializer, partitioner); } else { return new SinkNode(name, topic, keySerializer, valSerializer, partitioner); } @@ -232,6 +257,22 @@ public class TopologyBuilder { public TopologyBuilder() {} /** + * Set the applicationId to be used for auto-generated internal topics. + * + * This is required before calling {@link #sourceTopics}, {@link #topicGroups}, + * {@link #copartitionSources}, {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}. + * + * @param applicationId the streams applicationId. Should be the same as set by + * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} + */ + public synchronized final TopologyBuilder setApplicationId(String applicationId) { + Objects.requireNonNull(applicationId, "applicationId can't be null"); + this.applicationId = applicationId; + + return this; + } + + /** * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the @@ -342,8 +383,8 @@ public class TopologyBuilder { } } - nodeToSourcePatterns.put(name, topicPattern); nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer)); + nodeToSourcePatterns.put(name, topicPattern); nodeGrouper.add(name); return this; @@ -549,14 +590,10 @@ public class TopologyBuilder { } protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) { - if (sourceStoreToSourceTopic != null) { - if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) { - throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); - } - sourceStoreToSourceTopic.put(sourceStoreName, topic); - } else { - throw new TopologyBuilderException("sourceStoreToSourceTopic is null"); + if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) { + throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); } + sourceStoreToSourceTopic.put(sourceStoreName, topic); return this; } @@ -601,6 +638,17 @@ public class TopologyBuilder { return this; } + /** + * Asserts that the streams of the specified source nodes must be copartitioned. + * + * @param sourceNodes a set of source node names + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder copartitionSources(Collection<String> sourceNodes) { + copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); + return this; + } + private void connectProcessorAndStateStore(String processorName, String stateStoreName) { if (!stateFactories.containsKey(stateStoreName)) throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet."); @@ -625,7 +673,6 @@ public class TopologyBuilder { } } - private Set<String> findSourceTopicsForProcessorParents(String [] parents) { final Set<String> sourceTopics = new HashSet<>(); for (String parent : parents) { @@ -652,85 +699,6 @@ public class TopologyBuilder { } /** - * Returns the map of topic groups keyed by the group id. - * A topic group is a group of topics in the same task. - * - * @return groups of topic names - */ - public synchronized Map<Integer, TopicsInfo> topicGroups() { - Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>(); - - - if (subscriptionUpdates.hasUpdates()) { - for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { - SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); - //need to update nodeToSourceTopics with topics matched from given regex - nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); - } - } - - if (nodeGroups == null) - nodeGroups = makeNodeGroups(); - - - for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { - Set<String> sinkTopics = new HashSet<>(); - Set<String> sourceTopics = new HashSet<>(); - Set<String> internalSourceTopics = new HashSet<>(); - Set<String> stateChangelogTopics = new HashSet<>(); - for (String node : entry.getValue()) { - // if the node is a source node, add to the source topics - String[] topics = nodeToSourceTopics.get(node); - if (topics != null) { - // if some of the topics are internal, add them to the internal topics - for (String topic : topics) { - if (this.internalTopicNames.contains(topic)) { - if (applicationId == null) { - throw new TopologyBuilderException("There are internal topics and" - + " applicationId hasn't been " - + "set. Call setApplicationId " - + "first"); - } - // prefix the internal topic name with the application id - String internalTopic = applicationId + "-" + topic; - internalSourceTopics.add(internalTopic); - sourceTopics.add(internalTopic); - } else { - sourceTopics.add(topic); - } - } - } - - // if the node is a sink node, add to the sink topics - String topic = nodeToSinkTopic.get(node); - if (topic != null) { - if (internalTopicNames.contains(topic)) { - // prefix the change log topic name with the application id - sinkTopics.add(applicationId + "-" + topic); - } else { - sinkTopics.add(topic); - } - } - - // if the node is connected to a state, add to the state topics - for (StateStoreFactory stateFactory : stateFactories.values()) { - if (stateFactory.isInternal && stateFactory.users.contains(node)) { - // prefix the change log topic name with the application id - stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); - } - } - } - topicGroups.put(entry.getKey(), new TopicsInfo( - Collections.unmodifiableSet(sinkTopics), - Collections.unmodifiableSet(sourceTopics), - Collections.unmodifiableSet(internalSourceTopics), - Collections.unmodifiableSet(stateChangelogTopics))); - } - - return Collections.unmodifiableMap(topicGroups); - } - - /** * Returns the map of node groups keyed by the topic group id. * * @return groups of node names @@ -778,62 +746,12 @@ public class TopologyBuilder { } /** - * Asserts that the streams of the specified source nodes must be copartitioned. - * - * @param sourceNodes a set of source node names - * @return this builder instance so methods can be chained together; never null - */ - public synchronized final TopologyBuilder copartitionSources(Collection<String> sourceNodes) { - copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); - return this; - } - - /** - * Returns the copartition groups. - * A copartition group is a group of source topics that are required to be copartitioned. - * - * @return groups of topic names - */ - public synchronized Collection<Set<String>> copartitionGroups() { - List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size()); - for (Set<String> nodeNames : copartitionSourceGroups) { - Set<String> copartitionGroup = new HashSet<>(); - for (String node : nodeNames) { - String[] topics = nodeToSourceTopics.get(node); - if (topics != null) - copartitionGroup.addAll(convertInternalTopicNames(topics)); - } - list.add(Collections.unmodifiableSet(copartitionGroup)); - } - return Collections.unmodifiableList(list); - } - - private List<String> convertInternalTopicNames(String...topics) { - final List<String> topicNames = new ArrayList<>(); - for (String topic : topics) { - if (internalTopicNames.contains(topic)) { - if (applicationId == null) { - throw new TopologyBuilderException("there are internal topics " - + "and applicationId hasn't been set. Call " - + "setApplicationId first"); - } - topicNames.add(applicationId + "-" + topic); - } else { - topicNames.add(topic); - } - } - return topicNames; - } - - - /** * Build the topology for the specified topic group. This is called automatically when passing this builder into the * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor. * * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig) */ - public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId) { - Objects.requireNonNull(applicationId, "applicationId can't be null"); + public synchronized ProcessorTopology build(Integer topicGroupId) { Set<String> nodeGroup; if (topicGroupId != null) { nodeGroup = nodeGroups().get(topicGroupId); @@ -841,11 +759,11 @@ public class TopologyBuilder { // when nodeGroup is null, we build the full topology. this is used in some tests. nodeGroup = null; } - return build(applicationId, nodeGroup); + return build(nodeGroup); } @SuppressWarnings("unchecked") - private ProcessorTopology build(String applicationId, Set<String> nodeGroup) { + private ProcessorTopology build(Set<String> nodeGroup) { List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); Map<String, ProcessorNode> processorMap = new HashMap<>(); Map<String, SourceNode> topicSourceMap = new HashMap<>(); @@ -855,7 +773,7 @@ public class TopologyBuilder { // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - ProcessorNode node = factory.build(applicationId); + ProcessorNode node = factory.build(); processorNodes.add(node); processorMap.put(node.name(), node); @@ -870,11 +788,13 @@ public class TopologyBuilder { } } else if (factory instanceof SourceNodeFactory) { SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory; - String[] topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : sourceNodeFactory.getTopics(); + String[] topics = (sourceNodeFactory.pattern != null) ? + sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : + sourceNodeFactory.getTopics(); for (String topic : topics) { if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node); + topicSourceMap.put(decorateTopic(topic), (SourceNode) node); } else { topicSourceMap.put(topic, (SourceNode) node); } @@ -885,7 +805,7 @@ public class TopologyBuilder { processorMap.get(parent).addChild(node); if (internalTopicNames.contains(sinkNodeFactory.topic)) { // prefix the internal topic name with the application id - topicSinkMap.put(applicationId + "-" + sinkNodeFactory.topic, (SinkNode) node); + topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node); } else { topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node); } @@ -900,6 +820,78 @@ public class TopologyBuilder { } /** + * Returns the map of topic groups keyed by the group id. + * A topic group is a group of topics in the same task. + * + * @return groups of topic names + */ + public synchronized Map<Integer, TopicsInfo> topicGroups() { + Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>(); + + if (subscriptionUpdates.hasUpdates()) { + for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { + SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); + //need to update nodeToSourceTopics with topics matched from given regex + nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); + } + } + + if (nodeGroups == null) + nodeGroups = makeNodeGroups(); + + for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { + Set<String> sinkTopics = new HashSet<>(); + Set<String> sourceTopics = new HashSet<>(); + Set<String> internalSourceTopics = new HashSet<>(); + Set<String> stateChangelogTopics = new HashSet<>(); + for (String node : entry.getValue()) { + // if the node is a source node, add to the source topics + String[] topics = nodeToSourceTopics.get(node); + if (topics != null) { + // if some of the topics are internal, add them to the internal topics + for (String topic : topics) { + if (this.internalTopicNames.contains(topic)) { + // prefix the internal topic name with the application id + String internalTopic = decorateTopic(topic); + internalSourceTopics.add(internalTopic); + sourceTopics.add(internalTopic); + } else { + sourceTopics.add(topic); + } + } + } + + // if the node is a sink node, add to the sink topics + String topic = nodeToSinkTopic.get(node); + if (topic != null) { + if (internalTopicNames.contains(topic)) { + // prefix the change log topic name with the application id + sinkTopics.add(decorateTopic(topic)); + } else { + sinkTopics.add(topic); + } + } + + // if the node is connected to a state, add to the state topics + for (StateStoreFactory stateFactory : stateFactories.values()) { + if (stateFactory.isInternal && stateFactory.users.contains(node)) { + // prefix the change log topic name with the application id + stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.supplier.name())); + } + } + } + topicGroups.put(entry.getKey(), new TopicsInfo( + Collections.unmodifiableSet(sinkTopics), + Collections.unmodifiableSet(sourceTopics), + Collections.unmodifiableSet(internalSourceTopics), + Collections.unmodifiableSet(stateChangelogTopics))); + } + + return Collections.unmodifiableMap(topicGroups); + } + + + /** * Get the names of topics that are to be consumed by the source nodes created by this builder. * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null */ @@ -908,21 +900,62 @@ public class TopologyBuilder { return Collections.unmodifiableSet(topics); } - private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopicNames) { - Set<String> topics = new HashSet<>(); - for (String topic : sourceTopicNames) { + /** + * @return a mapping from state store name to a Set of source Topics. + */ + public Map<String, Set<String>> stateStoreNameToSourceTopics() { + final Map<String, Set<String>> results = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) { + results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue())); + + } + return results; + } + + /** + * Returns the copartition groups. + * A copartition group is a group of source topics that are required to be copartitioned. + * + * @return groups of topic names + */ + public synchronized Collection<Set<String>> copartitionGroups() { + List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size()); + for (Set<String> nodeNames : copartitionSourceGroups) { + Set<String> copartitionGroup = new HashSet<>(); + for (String node : nodeNames) { + String[] topics = nodeToSourceTopics.get(node); + if (topics != null) + copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics)); + } + list.add(Collections.unmodifiableSet(copartitionGroup)); + } + return Collections.unmodifiableList(list); + } + + private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopics) { + return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new String[sourceTopics.size()])); + } + + private Set<String> maybeDecorateInternalSourceTopics(String ... sourceTopics) { + final Set<String> decoratedTopics = new HashSet<>(); + for (String topic : sourceTopics) { if (internalTopicNames.contains(topic)) { - if (applicationId == null) { - throw new TopologyBuilderException("there are internal topics and " - + "applicationId is null. Call " - + "setApplicationId first"); - } - topics.add(applicationId + "-" + topic); + decoratedTopics.add(decorateTopic(topic)); } else { - topics.add(topic); + decoratedTopics.add(topic); } } - return topics; + return decoratedTopics; + } + + private String decorateTopic(String topic) { + if (applicationId == null) { + throw new TopologyBuilderException("there are internal topics and " + + "applicationId hasn't been set. Call " + + "setApplicationId first"); + } + + return applicationId + "-" + topic; } public synchronized Pattern sourceTopicPattern() { @@ -948,28 +981,4 @@ public class TopologyBuilder { public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) { this.subscriptionUpdates = subscriptionUpdates; } - - /** - * Set the applicationId. This is required before calling - * {@link #sourceTopics}, {@link #topicGroups}, {@link #copartitionSources}, and - * {@link #stateStoreNameToSourceTopics} - * @param applicationId the streams applicationId. Should be the same as set by - * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} - */ - public synchronized void setApplicationId(String applicationId) { - Objects.requireNonNull(applicationId, "applicationId can't be null"); - this.applicationId = applicationId; - } - - /** - * @return a mapping from state store name to a Set of source Topics. - */ - public Map<String, Set<String>> stateStoreNameToSourceTopics() { - final Map<String, Set<String>> results = new HashMap<>(); - for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) { - results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue())); - - } - return results; - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 50d77c3..c0e54b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -542,7 +542,7 @@ public class StreamThread extends Thread { protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(applicationId, id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors, stateDirectory); } @@ -612,7 +612,7 @@ public class StreamThread extends Thread { protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(applicationId, id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStoreSuppliers().isEmpty()) { return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors, stateDirectory); http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index 0650b95..fb19c0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -48,6 +48,7 @@ public class KStreamBranchTest { @Test public void testKStreamBranch() { KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId("X"); Predicate<Integer, String> isEven = new Predicate<Integer, String>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 9cbc156..fb2afec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -143,7 +143,7 @@ public class KStreamImplTest { 1 + // to 2 + // through 1, // process - builder.build("X", null).processors().size()); + builder.setApplicationId("X").build(null).processors().size()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 6f047b0..fe66acb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -227,17 +227,18 @@ public class TopologyBuilderTest { @Test public void testAddStateStore() { final TopologyBuilder builder = new TopologyBuilder(); - List<StateStoreSupplier> suppliers; StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false); builder.addStateStore(supplier); - suppliers = builder.build("X", null).stateStoreSuppliers(); - assertEquals(0, suppliers.size()); + builder.setApplicationId("X"); + + assertEquals(0, builder.build(null).stateStoreSuppliers().size()); builder.addSource("source-1", "topic-1"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); builder.connectProcessorAndStateStores("processor-1", "store-1"); - suppliers = builder.build("X", null).stateStoreSuppliers(); + + List<StateStoreSupplier> suppliers = builder.build(null).stateStoreSuppliers(); assertEquals(1, suppliers.size()); assertEquals(supplier.name(), suppliers.get(0).name()); } @@ -245,7 +246,8 @@ public class TopologyBuilderTest { @Test public void testTopicGroups() { final TopologyBuilder builder = new TopologyBuilder(); - + builder.setApplicationId("X"); + builder.addInternalTopic("topic-1x"); builder.addSource("source-1", "topic-1", "topic-1x"); builder.addSource("source-2", "topic-2"); builder.addSource("source-3", "topic-3"); @@ -262,7 +264,7 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet())); expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet())); @@ -271,7 +273,7 @@ public class TopologyBuilderTest { Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); + assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); } @Test @@ -322,9 +324,10 @@ public class TopologyBuilderTest { builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1"); builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - ProcessorTopology topology0 = builder.build("X", 0); - ProcessorTopology topology1 = builder.build("X", 1); - ProcessorTopology topology2 = builder.build("X", 2); + builder.setApplicationId("X"); + ProcessorTopology topology0 = builder.build(0); + ProcessorTopology topology1 = builder.build(1); + ProcessorTopology topology2 = builder.build(2); assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors())); assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors())); @@ -379,12 +382,6 @@ public class TopologyBuilderTest { } @Test(expected = NullPointerException.class) - public void shouldNotAllowNullApplicationIdOnBuild() throws Exception { - final TopologyBuilder builder = new TopologyBuilder(); - builder.build(null, 1); - } - - @Test(expected = NullPointerException.class) public void shouldNotSetApplicationIdToNull() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(null); http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index d08780b..f7ef7f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -87,7 +87,7 @@ public class ProcessorTopologyTest { @Test public void testTopologyMetadata() { - final TopologyBuilder builder = new TopologyBuilder(); + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source-1", "topic-1"); builder.addSource("source-2", "topic-2", "topic-3"); @@ -96,7 +96,7 @@ public class ProcessorTopologyTest { builder.addSink("sink-1", "topic-3", "processor-1"); builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); - final ProcessorTopology topology = builder.build("X", null); + final ProcessorTopology topology = builder.build(null); assertEquals(6, topology.processors().size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 1a66d32..1da7592 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -153,7 +153,7 @@ public class StreamThreadTest { public void testPartitionAssignmentChange() throws Exception { StreamsConfig config = new StreamsConfig(configProps()); - TopologyBuilder builder = new TopologyBuilder(); + TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); @@ -162,7 +162,7 @@ public class StreamThreadTest { StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { - ProcessorTopology topology = builder.build("X", id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config, stateDirectory); } }; @@ -275,7 +275,7 @@ public class StreamThreadTest { MockTime mockTime = new MockTime(); - TopologyBuilder builder = new TopologyBuilder(); + TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) { @@ -286,7 +286,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { - ProcessorTopology topology = builder.build("X", id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config, stateDirectory); } }; @@ -394,7 +394,7 @@ public class StreamThreadTest { MockTime mockTime = new MockTime(); - TopologyBuilder builder = new TopologyBuilder(); + TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) { @@ -405,7 +405,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { - ProcessorTopology topology = builder.build("X", id.topicGroupId); + ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config, stateDirectory); } }; @@ -465,7 +465,7 @@ public class StreamThreadTest { @Test public void testInjectClients() { - TopologyBuilder builder = new TopologyBuilder(); + TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); StreamsConfig config = new StreamsConfig(configProps()); MockClientSupplier clientSupplier = new MockClientSupplier(); StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 1baedbb..795d7da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -92,7 +92,8 @@ public class StreamThreadStateStoreProviderTest { configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog"); configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog"); - final ProcessorTopology topology = builder.build("X", null); + builder.setApplicationId(applicationId); + final ProcessorTopology topology = builder.build(null); final Map<TaskId, StreamTask> tasks = new HashMap<>(); stateDirectory = new StateDirectory(applicationId, stateConfigDir); taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology, http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 7316804..3901d3a 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -57,7 +57,8 @@ public class KStreamTestDriver { File stateDir, Serde<?> keySerde, Serde<?> valSerde) { - this.topology = builder.build("X", null); + builder.setApplicationId("TestDriver"); + this.topology = builder.build(null); this.stateDir = stateDir; this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector()); this.context.setTime(0L); http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index d2d9668..6b8d969 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -148,7 +148,7 @@ public class ProcessorTopologyTestDriver { */ public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { id = new TaskId(0, 0); - topology = builder.build("X", null); + topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null); // Set up the consumer and producer ... consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
