Repository: kafka Updated Branches: refs/heads/trunk 0744449ea -> 114945cb6
KAFKA-3855: Guard race conditions in TopologyBuilder Mark all public `TopologyBuilder` methods as synchronized as they can modify data-structures and these methods could be called from multiple threads Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1633 from dguy/kafka-3855 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/114945cb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/114945cb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/114945cb Branch: refs/heads/trunk Commit: 114945cb6297f909a7b1272bd61e17b662d42d24 Parents: 0744449 Author: Damian Guy <[email protected]> Authored: Tue Jul 19 08:44:48 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Jul 19 08:44:48 2016 -0700 ---------------------------------------------------------------------- .../streams/processor/TopologyBuilder.java | 46 ++++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/114945cb/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 19440e4..2c02b0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -239,7 +239,7 @@ public class TopologyBuilder { * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addSource(String name, String... topics) { + public synchronized final TopologyBuilder addSource(String name, String... topics) { return addSource(name, (Deserializer) null, (Deserializer) null, topics); } @@ -256,7 +256,7 @@ public class TopologyBuilder { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addSource(String name, Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern) { return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern); } @@ -276,7 +276,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ - public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { + public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { if (nodeFactories.containsKey(name)) throw new TopologyBuilderException("Processor " + name + " is already added."); @@ -320,7 +320,7 @@ public class TopologyBuilder { * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name */ - public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { if (topicPattern == null) { throw new TopologyBuilderException("Pattern can't be null"); @@ -358,7 +358,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public final TopologyBuilder addSink(String name, String topic, String... parentNames) { + public synchronized final TopologyBuilder addSink(String name, String topic, String... parentNames) { return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames); } @@ -385,7 +385,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) { + public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) { return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames); } @@ -408,7 +408,7 @@ public class TopologyBuilder { * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) { + public synchronized final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) { return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames); } @@ -433,7 +433,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, String...) * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name */ - public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) { + public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) { if (nodeFactories.containsKey(name)) throw new TopologyBuilderException("Processor " + name + " is already added."); @@ -465,7 +465,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name */ - public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) { + public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) { if (nodeFactories.containsKey(name)) throw new TopologyBuilderException("Processor " + name + " is already added."); @@ -493,7 +493,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if state store supplier is already added */ - public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) { + public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) { if (stateFactories.containsKey(supplier.name())) { throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added."); } @@ -515,7 +515,7 @@ public class TopologyBuilder { * @param supplier the supplier used to obtain this state store {@link StateStore} instance * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { + public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { return this.addStateStore(supplier, true, processorNames); } @@ -526,7 +526,7 @@ public class TopologyBuilder { * @param stateStoreNames the names of state stores that the processor uses * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) { + public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) { if (stateStoreNames != null) { for (String stateStoreName : stateStoreNames) { connectProcessorAndStateStore(processorName, stateStoreName); @@ -546,7 +546,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet */ - public final TopologyBuilder connectProcessors(String... processorNames) { + public synchronized final TopologyBuilder connectProcessors(String... processorNames) { if (processorNames.length < 2) throw new TopologyBuilderException("At least two processors need to participate in the connection."); @@ -569,7 +569,7 @@ public class TopologyBuilder { * @param topicName the name of the topic * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addInternalTopic(String topicName) { + public synchronized final TopologyBuilder addInternalTopic(String topicName) { this.internalTopicNames.add(topicName); return this; @@ -603,7 +603,7 @@ public class TopologyBuilder { * * @return groups of topic names */ - public Map<Integer, TopicsInfo> topicGroups() { + public synchronized Map<Integer, TopicsInfo> topicGroups() { Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>(); @@ -681,7 +681,7 @@ public class TopologyBuilder { * * @return groups of node names */ - public Map<Integer, Set<String>> nodeGroups() { + public synchronized Map<Integer, Set<String>> nodeGroups() { if (nodeGroups == null) nodeGroups = makeNodeGroups(); @@ -729,7 +729,7 @@ public class TopologyBuilder { * @param sourceNodes a set of source node names * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder copartitionSources(Collection<String> sourceNodes) { + public synchronized final TopologyBuilder copartitionSources(Collection<String> sourceNodes) { copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); return this; } @@ -740,7 +740,7 @@ public class TopologyBuilder { * * @return groups of topic names */ - public Collection<Set<String>> copartitionGroups() { + public synchronized Collection<Set<String>> copartitionGroups() { List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size()); for (Set<String> nodeNames : copartitionSourceGroups) { Set<String> copartitionGroup = new HashSet<>(); @@ -777,7 +777,7 @@ public class TopologyBuilder { * * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig) */ - public ProcessorTopology build(String applicationId, Integer topicGroupId) { + public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId) { Set<String> nodeGroup; if (topicGroupId != null) { nodeGroup = nodeGroups().get(topicGroupId); @@ -839,7 +839,7 @@ public class TopologyBuilder { * Get the names of topics that are to be consumed by the source nodes created by this builder. * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null */ - public Set<String> sourceTopics() { + public synchronized Set<String> sourceTopics() { Set<String> topics = new HashSet<>(); for (String topic : sourceTopicNames) { if (internalTopicNames.contains(topic)) { @@ -856,7 +856,7 @@ public class TopologyBuilder { return Collections.unmodifiableSet(topics); } - public Pattern sourceTopicPattern() { + public synchronized Pattern sourceTopicPattern() { if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) { StringBuilder builder = new StringBuilder(); for (Pattern pattern : nodeToSourcePatterns.values()) { @@ -876,7 +876,7 @@ public class TopologyBuilder { return this.topicPattern; } - public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) { + public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) { this.subscriptionUpdates = subscriptionUpdates; } @@ -886,7 +886,7 @@ public class TopologyBuilder { * @param applicationId the streams applicationId. Should be the same as set by * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} */ - public void setApplicationId(String applicationId) { + public synchronized void setApplicationId(String applicationId) { this.applicationId = applicationId; } }
