Repository: kafka Updated Branches: refs/heads/0.10.2 80ceb75d2 -> 0f87991d5
http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index b25fcad..81f4302 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -49,6 +49,7 @@ import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; + /** * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors, * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to @@ -81,7 +82,7 @@ public class TopologyBuilder { private final List<Set<String>> copartitionSourceGroups = new ArrayList<>(); // map from source processor names to subscribed topics (without application-id prefix for internal topics) - private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>(); + private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>(); // map from source processor names to regex subscription patterns private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>(); @@ -146,11 +147,11 @@ public class TopologyBuilder { } private static class ProcessorNodeFactory extends NodeFactory { - public final String[] parents; - private final ProcessorSupplier supplier; + private final String[] parents; + private final ProcessorSupplier<?, ?> supplier; private final Set<String> stateStoreNames = new HashSet<>(); - public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) { + ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) { super(name); this.parents = parents.clone(); this.supplier = supplier; @@ -160,37 +161,32 @@ public class TopologyBuilder { stateStoreNames.add(stateStoreName); } - @SuppressWarnings("unchecked") @Override public ProcessorNode build() { - return new ProcessorNode(name, supplier.get(), stateStoreNames); + return new ProcessorNode<>(name, supplier.get(), stateStoreNames); } } private class SourceNodeFactory extends NodeFactory { - private final String[] topics; - public final Pattern pattern; - private Deserializer keyDeserializer; - private Deserializer valDeserializer; + private final List<String> topics; + private final Pattern pattern; + private final Deserializer<?> keyDeserializer; + private final Deserializer<?> valDeserializer; - private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) { + private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) { super(name); - this.topics = topics != null ? topics.clone() : null; + this.topics = topics != null ? Arrays.asList(topics) : null; this.pattern = pattern; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; } - String[] getTopics() { - return topics; - } - - String[] getTopics(Collection<String> subscribedTopics) { + List<String> getTopics(Collection<String> subscribedTopics) { // if it is subscribed via patterns, it is possible that the topic metadata has not been updated // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. if (subscribedTopics.isEmpty()) - return new String[] {"Pattern[" + pattern + "]"}; + return Collections.singletonList("Pattern[" + pattern + "]"); List<String> matchedTopics = new ArrayList<>(); for (String update : subscribedTopics) { @@ -207,21 +203,20 @@ public class TopologyBuilder { matchedTopics.add(update); } } - return matchedTopics.toArray(new String[matchedTopics.size()]); + return matchedTopics; } - @SuppressWarnings("unchecked") @Override public ProcessorNode build() { - final String[] sourceTopics = nodeToSourceTopics.get(name); + final List<String> sourceTopics = nodeToSourceTopics.get(name); // if it is subscribed via patterns, it is possible that the topic metadata has not been updated // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. if (sourceTopics == null) - return new SourceNode(name, new String[] {"Pattern[" + pattern + "]"}, keyDeserializer, valDeserializer); + return new SourceNode<>(name, Collections.singletonList("Pattern[" + pattern + "]"), keyDeserializer, valDeserializer); else - return new SourceNode(name, maybeDecorateInternalSourceTopics(sourceTopics).toArray(new String[sourceTopics.length]), keyDeserializer, valDeserializer); + return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), keyDeserializer, valDeserializer); } private boolean isMatch(String topic) { @@ -229,14 +224,14 @@ public class TopologyBuilder { } } - private class SinkNodeFactory extends NodeFactory { - public final String[] parents; - public final String topic; - private Serializer keySerializer; - private Serializer valSerializer; - private final StreamPartitioner partitioner; + private class SinkNodeFactory<K, V> extends NodeFactory { + private final String[] parents; + private final String topic; + private final Serializer<K> keySerializer; + private final Serializer<V> valSerializer; + private final StreamPartitioner<? super K, ? super V> partitioner; - private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) { + private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) { super(name); this.parents = parents.clone(); this.topic = topic; @@ -245,14 +240,13 @@ public class TopologyBuilder { this.partitioner = partitioner; } - @SuppressWarnings("unchecked") @Override public ProcessorNode build() { if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - return new SinkNode(name, decorateTopic(topic), keySerializer, valSerializer, partitioner); + return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner); } else { - return new SinkNode(name, topic, keySerializer, valSerializer, partitioner); + return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner); } } } @@ -263,7 +257,7 @@ public class TopologyBuilder { public Map<String, InternalTopicConfig> stateChangelogTopics; public Map<String, InternalTopicConfig> repartitionSourceTopics; - public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) { + TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) { this.sinkTopics = sinkTopics; this.sourceTopics = sourceTopics; this.stateChangelogTopics = stateChangelogTopics; @@ -312,8 +306,8 @@ public class TopologyBuilder { /** * Set the applicationId to be used for auto-generated internal topics. * - * This is required before calling {@link #sourceTopics}, {@link #topicGroups}, - * {@link #copartitionSources}, {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}. + * This is required before calling {@link #topicGroups}, {@link #copartitionSources}, + * {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}. * * @param applicationId the streams applicationId. Should be the same as set by * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} @@ -337,7 +331,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(String name, String... topics) { - return addSource(null, name, (Deserializer) null, (Deserializer) null, topics); + return addSource(null, name, null, null, topics); } /** @@ -353,7 +347,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, String... topics) { - return addSource(offsetReset, name, (Deserializer) null, (Deserializer) null, topics); + return addSource(offsetReset, name, null, null, topics); } @@ -370,7 +364,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern) { - return addSource(null, name, (Deserializer) null, (Deserializer) null, topicPattern); + return addSource(null, name, null, null, topicPattern); } /** @@ -387,7 +381,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) { - return addSource(offsetReset, name, (Deserializer) null, (Deserializer) null, topicPattern); + return addSource(offsetReset, name, null, null, topicPattern); } @@ -445,7 +439,7 @@ public class TopologyBuilder { } nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer)); - nodeToSourceTopics.put(name, topics.clone()); + nodeToSourceTopics.put(name, Arrays.asList(topics)); nodeGrouper.add(name); return this; @@ -470,7 +464,7 @@ public class TopologyBuilder { * @param topic the topic to source the data from * @param processorName the name of the {@link ProcessorSupplier} * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} - * @return + * @return this builder instance so methods can be chained together; never null */ public synchronized TopologyBuilder addGlobalStore(final StateStore store, final String sourceName, @@ -499,7 +493,7 @@ public class TopologyBuilder { globalTopics.add(topic); final String[] topics = {topic}; nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, keyDeserializer, valueDeserializer)); - nodeToSourceTopics.put(sourceName, topics.clone()); + nodeToSourceTopics.put(sourceName, Arrays.asList(topics)); nodeGrouper.add(sourceName); final String[] parents = {sourceName}; @@ -612,7 +606,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public synchronized final TopologyBuilder addSink(String name, String topic, String... parentNames) { - return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames); + return addSink(name, topic, null, null, parentNames); } /** @@ -639,7 +633,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) { - return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames); + return addSink(name, topic, null, null, partitioner, parentNames); } /** @@ -662,7 +656,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public synchronized final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) { - return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames); + return addSink(name, topic, keySerializer, valSerializer, null, parentNames); } /** @@ -703,7 +697,7 @@ public class TopologyBuilder { } } - nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer, partitioner)); + nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, topic, keySerializer, valSerializer, partitioner)); nodeToSinkTopic.put(name, topic); nodeGrouper.add(name); nodeGrouper.unite(name, parentNames); @@ -876,7 +870,7 @@ public class TopologyBuilder { for (String parent : parents) { NodeFactory nodeFactory = nodeFactories.get(parent); if (nodeFactory instanceof SourceNodeFactory) { - sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics())); + sourceTopics.addAll(((SourceNodeFactory) nodeFactory).topics); } else if (nodeFactory instanceof ProcessorNodeFactory) { sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents)); } @@ -1013,8 +1007,8 @@ public class TopologyBuilder { for (String node : nodes) { final NodeFactory nodeFactory = nodeFactories.get(node); if (nodeFactory instanceof SourceNodeFactory) { - final String[] topics = ((SourceNodeFactory) nodeFactory).getTopics(); - if (topics != null && topics.length == 1 && globalTopics.contains(topics[0])) { + final List<String> topics = ((SourceNodeFactory) nodeFactory).topics; + if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) { globalGroups.addAll(nodes); } } @@ -1023,7 +1017,6 @@ public class TopologyBuilder { return globalGroups; } - @SuppressWarnings("unchecked") private ProcessorTopology build(Set<String> nodeGroup) { List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); Map<String, ProcessorNode> processorMap = new HashMap<>(); @@ -1040,7 +1033,8 @@ public class TopologyBuilder { if (factory instanceof ProcessorNodeFactory) { for (String parent : ((ProcessorNodeFactory) factory).parents) { - processorMap.get(parent).addChild(node); + ProcessorNode<?, ?> parentNode = processorMap.get(parent); + parentNode.addChild(node); } for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { if (!stateStoreMap.containsKey(stateStoreName)) { @@ -1063,10 +1057,10 @@ public class TopologyBuilder { } } } else if (factory instanceof SourceNodeFactory) { - SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory; - String[] topics = (sourceNodeFactory.pattern != null) ? + final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory; + final List<String> topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : - sourceNodeFactory.getTopics(); + sourceNodeFactory.topics; for (String topic : topics) { if (internalTopicNames.contains(topic)) { @@ -1077,7 +1071,8 @@ public class TopologyBuilder { } } } else if (factory instanceof SinkNodeFactory) { - SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory; + final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory; + for (String parent : sinkNodeFactory.parents) { processorMap.get(parent).addChild(node); if (internalTopicNames.contains(sinkNodeFactory.topic)) { @@ -1105,13 +1100,6 @@ public class TopologyBuilder { return Collections.unmodifiableMap(globalStateStores); } - private StateStore getStateStore(final String stateStoreName) { - if (stateFactories.containsKey(stateStoreName)) { - return stateFactories.get(stateStoreName).supplier.get(); - } - return globalStateStores.get(stateStoreName); - } - /** * Returns the map of topic groups keyed by the group id. * A topic group is a group of topics in the same task. @@ -1131,7 +1119,7 @@ public class TopologyBuilder { Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>(); for (String node : entry.getValue()) { // if the node is a source node, add to the source topics - String[] topics = nodeToSourceTopics.get(node); + List<String> topics = nodeToSourceTopics.get(node); if (topics != null) { // if some of the topics are internal, add them to the internal topics for (String topic : topics) { @@ -1196,7 +1184,7 @@ public class TopologyBuilder { } } - private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) { + private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier, final String name) { if (!(supplier instanceof WindowStoreSupplier)) { return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig()); } @@ -1210,27 +1198,13 @@ public class TopologyBuilder { return config; } - - /** - * Get the names of topics that are to be consumed by the source nodes created by this builder. - * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null - */ - public synchronized Set<String> sourceTopics() { - Set<String> topics = maybeDecorateInternalSourceTopics(sourceTopicNames); - return Collections.unmodifiableSet(topics); - } - /** * Get the Pattern to match all topics requiring to start reading from earliest available offset * @return the Pattern for matching all topics reading from earliest offset, never null */ public synchronized Pattern earliestResetTopicsPattern() { - Set<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics); - - String[] sourceTopicNames = topics.toArray(new String[topics.size()]); - Pattern[] sourceTopicPatterns = earliestResetPatterns.toArray(new Pattern[earliestResetPatterns.size()]); - - Pattern earliestPattern = buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns); + final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics); + final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns); ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics); @@ -1242,12 +1216,8 @@ public class TopologyBuilder { * @return the Pattern for matching all topics reading from latest offset, never null */ public synchronized Pattern latestResetTopicsPattern() { - Set<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics); - - String[] sourceTopicNames = topics.toArray(new String[topics.size()]); - Pattern[] sourceTopicPatterns = latestResetPatterns.toArray(new Pattern[latestResetPatterns.size()]); - - Pattern latestPattern = buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns); + final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics); + final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns); ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics); @@ -1267,10 +1237,8 @@ public class TopologyBuilder { throw new TopologyBuilderException(String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets", builtPattern.pattern(), otherTopic)); } } - } - /** * Builds a composite pattern out of topic names and Pattern object for matching topic names. If the provided * arrays are empty a Pattern.compile("") instance is returned. @@ -1279,7 +1247,7 @@ public class TopologyBuilder { * @param sourcePatterns Patterns for matching source topics to add to a composite pattern * @return a Pattern that is composed of the literal source topic names and any Patterns for matching source topics */ - private static synchronized Pattern buildPatternForOffsetResetTopics(String[] sourceTopics, Pattern[] sourcePatterns) { + private static synchronized Pattern buildPatternForOffsetResetTopics(Collection<String> sourceTopics, Collection<Pattern> sourcePatterns) { StringBuilder builder = new StringBuilder(); for (String topic : sourceTopics) { @@ -1301,11 +1269,10 @@ public class TopologyBuilder { /** * @return a mapping from state store name to a Set of source Topics. */ - public Map<String, Set<String>> stateStoreNameToSourceTopics() { - final Map<String, Set<String>> results = new HashMap<>(); + public Map<String, List<String>> stateStoreNameToSourceTopics() { + final Map<String, List<String>> results = new HashMap<>(); for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) { results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue())); - } return results; } @@ -1321,7 +1288,7 @@ public class TopologyBuilder { for (Set<String> nodeNames : copartitionSourceGroups) { Set<String> copartitionGroup = new HashSet<>(); for (String node : nodeNames) { - String[] topics = nodeToSourceTopics.get(node); + final List<String> topics = nodeToSourceTopics.get(node); if (topics != null) copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics)); } @@ -1330,12 +1297,8 @@ public class TopologyBuilder { return Collections.unmodifiableList(list); } - private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopics) { - return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new String[sourceTopics.size()])); - } - - private Set<String> maybeDecorateInternalSourceTopics(String ... sourceTopics) { - final Set<String> decoratedTopics = new HashSet<>(); + private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) { + final List<String> decoratedTopics = new ArrayList<>(); for (String topic : sourceTopics) { if (internalTopicNames.contains(topic)) { decoratedTopics.add(decorateTopic(topic)); @@ -1357,23 +1320,18 @@ public class TopologyBuilder { } public synchronized Pattern sourceTopicPattern() { - if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) { - - List<String> allNodeToSourceTopics = new ArrayList<>(); + if (this.topicPattern == null) { + List<String> allSourceTopics = new ArrayList<>(); if (!nodeToSourceTopics.isEmpty()) { - for (String[] topics : nodeToSourceTopics.values()) { - allNodeToSourceTopics.addAll(Arrays.asList(topics)); - + for (List<String> topics : nodeToSourceTopics.values()) { + allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics)); } } - int numPatterns = nodeToSourcePatterns.values().size(); - int numTopics = allNodeToSourceTopics.size(); - - Pattern[] patterns = nodeToSourcePatterns.values().toArray(new Pattern[numPatterns]); - String[] allTopics = allNodeToSourceTopics.toArray(new String[numTopics]); + Collections.sort(allSourceTopics); - this.topicPattern = buildPatternForOffsetResetTopics(allTopics, patterns); + this.topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values()); } + return this.topicPattern; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 0ebfda7..c4db740 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -29,11 +29,11 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { private final String topic; private Serializer<K> keySerializer; private Serializer<V> valSerializer; - private final StreamPartitioner<K, V> partitioner; + private final StreamPartitioner<? super K, ? super V> partitioner; private ProcessorContext context; - public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) { + public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) { super(name); this.topic = topic; http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 771f504..3406606 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -21,14 +21,17 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.kstream.internals.ChangedDeserializer; import org.apache.kafka.streams.processor.ProcessorContext; +import java.util.List; + public class SourceNode<K, V> extends ProcessorNode<K, V> { + private final List<String> topics; + + private ProcessorContext context; private Deserializer<K> keyDeserializer; private Deserializer<V> valDeserializer; - private ProcessorContext context; - private String[] topics; - public SourceNode(String name, String[] topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { + public SourceNode(String name, List<String> topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { super(name); this.topics = topics; this.keyDeserializer = keyDeserializer; http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 38961f2..b445a51 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -180,14 +180,13 @@ public class StreamThread extends Thread { protected final StreamsConfig config; protected final TopologyBuilder builder; - protected final Set<String> sourceTopics; - protected final Pattern topicPattern; protected final Producer<byte[], byte[]> producer; protected final Consumer<byte[], byte[]> consumer; protected final Consumer<byte[], byte[]> restoreConsumer; private final String logPrefix; private final String threadClientId; + private final Pattern sourceTopicPattern; private final Map<TaskId, StreamTask> activeTasks; private final Map<TaskId, StandbyTask> standbyTasks; private final Map<TopicPartition, StreamTask> activeTasksByPartition; @@ -200,6 +199,7 @@ public class StreamThread extends Thread { private final long cleanTimeMs; private final long commitTimeMs; private final StreamsMetricsThreadImpl streamsMetrics; + // TODO: this is not private only for tests, should be better refactored final StateDirectory stateDirectory; private String originalReset; private StreamPartitionAssignor partitionAssignor = null; @@ -291,8 +291,7 @@ public class StreamThread extends Thread { String threadName = getName(); this.config = config; this.builder = builder; - this.sourceTopics = builder.sourceTopics(); - this.topicPattern = builder.sourceTopicPattern(); + this.sourceTopicPattern = builder.sourceTopicPattern(); this.clientId = clientId; this.processId = processId; this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); @@ -566,11 +565,7 @@ public class StreamThread extends Thread { boolean requiresPoll = true; boolean polledRecords = false; - if (topicPattern != null) { - consumer.subscribe(topicPattern, rebalanceListener); - } else { - consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener); - } + consumer.subscribe(sourceTopicPattern, rebalanceListener); while (stillRunning()) { this.timerStartedMs = time.milliseconds(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index a59eb5f..6fb6e06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -84,7 +84,7 @@ public class StreamsMetadataState { return allMetadata; } - final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); + final List<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); if (sourceTopics == null) { return Collections.emptyList(); } @@ -201,7 +201,7 @@ public class StreamsMetadataState { rebuildMetadata(currentState); } - private boolean hasPartitionsForAnyTopics(final Set<String> topicNames, final Set<TopicPartition> partitionForHost) { + private boolean hasPartitionsForAnyTopics(final List<String> topicNames, final Set<TopicPartition> partitionForHost) { for (TopicPartition topicPartition : partitionForHost) { if (topicNames.contains(topicPartition.topic())) { return true; @@ -215,13 +215,13 @@ public class StreamsMetadataState { if (currentState.isEmpty()) { return; } - final Map<String, Set<String>> stores = builder.stateStoreNameToSourceTopics(); + final Map<String, List<String>> stores = builder.stateStoreNameToSourceTopics(); for (Map.Entry<HostInfo, Set<TopicPartition>> entry : currentState.entrySet()) { final HostInfo key = entry.getKey(); final Set<TopicPartition> partitionsForHost = new HashSet<>(entry.getValue()); final Set<String> storesOnHost = new HashSet<>(); - for (Map.Entry<String, Set<String>> storeTopicEntry : stores.entrySet()) { - final Set<String> topicsForStore = storeTopicEntry.getValue(); + for (Map.Entry<String, List<String>> storeTopicEntry : stores.entrySet()) { + final List<String> topicsForStore = storeTopicEntry.getValue(); if (hasPartitionsForAnyTopics(topicsForStore, partitionsForHost)) { storesOnHost.add(storeTopicEntry.getKey()); } @@ -259,7 +259,7 @@ public class StreamsMetadataState { } private SourceTopicsInfo getSourceTopicsInfo(final String storeName) { - final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); + final List<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName); if (sourceTopics == null || sourceTopics.isEmpty()) { return null; } @@ -271,11 +271,11 @@ public class StreamsMetadataState { } private class SourceTopicsInfo { - private final Set<String> sourceTopics; + private final List<String> sourceTopics; private int maxPartitions; private String topicWithMostPartitions; - private SourceTopicsInfo(final Set<String> sourceTopics) { + private SourceTopicsInfo(final List<String> sourceTopics) { this.sourceTopics = sourceTopics; for (String topic : sourceTopics) { final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 41277c7..5dae8dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.config.ConfigException; @@ -201,6 +202,7 @@ public class KafkaStreamsTest { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final KStreamBuilder builder = new KStreamBuilder(); final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 6f3c95a..f0fb0a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -354,9 +354,9 @@ public class KStreamRepartitionJoinTest { streamOneInput = "stream-one-" + testNo; streamTwoInput = "stream-two-" + testNo; streamFourInput = "stream-four-" + testNo; - CLUSTER.createTopic(streamOneInput); - CLUSTER.createTopic(streamTwoInput); - CLUSTER.createTopic(streamFourInput); + CLUSTER.createTopic(streamOneInput, 2, 1); + CLUSTER.createTopic(streamTwoInput, 2, 1); + CLUSTER.createTopic(streamFourInput, 2, 1); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 619b6b5..fe7bebc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -68,7 +68,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); - putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false); + putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); for (int i = 0; i < brokers.length; i++) { brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 875c359..3e7c41b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -214,7 +214,7 @@ public class IntegrationTestUtils { } }; - final String conditionDetails = "Did not receive " + expectedNumRecords + " number of records"; + final String conditionDetails = "Expecting " + expectedNumRecords + " records while only received " + accumData.size() + ": " + accumData; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); @@ -254,7 +254,7 @@ public class IntegrationTestUtils { } }; - final String conditionDetails = "Did not receive " + expectedNumRecords + " number of records"; + final String conditionDetails = "Expecting " + expectedNumRecords + " records while only received " + accumData.size() + ": " + accumData; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 5f126c3..a469f25 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -133,9 +133,8 @@ public class KStreamBuilderTest { final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3); merged.groupByKey().count("my-table"); - final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics(); - - assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table")); + final Map<String, List<String>> actual = builder.stateStoreNameToSourceTopics(); + assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table")); } @Test(expected = TopologyBuilderException.class) @@ -227,12 +226,11 @@ public class KStreamBuilderTest { final KStream<String, String> playEvents = builder.stream("events"); final KTable<String, String> table = builder.table("table-topic", "table-store"); - assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); + assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count"); - - assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); - assertEquals(Collections.singleton(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); + assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); + assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 4712320..2f3a450 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -156,12 +156,9 @@ public class TopologyBuilderTest { builder.addSource("source-3", "topic-3"); builder.addInternalTopic("topic-3"); - Set<String> expected = new HashSet<>(); - expected.add("topic-1"); - expected.add("topic-2"); - expected.add("X-topic-3"); + Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2"); - assertEquals(expected, builder.sourceTopics()); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); } @Test @@ -184,7 +181,7 @@ public class TopologyBuilderTest { @Test public void testSubscribeTopicNameAndPattern() { final TopologyBuilder builder = new TopologyBuilder(); - Pattern expectedPattern = Pattern.compile("topic-foo|topic-bar|.*-\\d"); + Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d"); builder.addSource("source-1", "topic-foo", "topic-bar"); builder.addSource("source-2", Pattern.compile(".*-\\d")); assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); @@ -441,9 +438,9 @@ public class TopologyBuilderTest { builder.addSource("source", "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); - final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); + final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); - assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store")); + assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store")); } @Test @@ -452,9 +449,9 @@ public class TopologyBuilderTest { builder.addSource("source", "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); - final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); + final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); - assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store")); + assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store")); } @Test @@ -465,9 +462,9 @@ public class TopologyBuilderTest { builder.addSource("source", "internal-topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); - final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); + final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); - assertEquals(Collections.singleton("appId-internal-topic"), stateStoreNameToSourceTopic.get("store")); + assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store")); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java index fdd9127..cf328ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.streams.errors.StreamsException; import org.junit.Test; +import java.util.Collections; + import static org.junit.Assert.assertEquals; public class SourceNodeRecordDeserializerTest { @@ -82,7 +84,7 @@ public class SourceNodeRecordDeserializerTest { final boolean valueThrowsException, final Object key, final Object value) { - super("", new String[0], null, null); + super("", Collections.EMPTY_LIST, null, null); this.keyThrowsException = keyThrowsException; this.valueThrowsException = valueThrowsException; this.key = key; http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java index 096c64a..4e0d21a 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.SourceNode; import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; public class MockSourceNode<K, V> extends SourceNode<K, V> { @@ -36,7 +37,7 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> { public boolean initialized; public MockSourceNode(String[] topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { - super(NAME + INDEX.getAndIncrement(), topics, keyDeserializer, valDeserializer); + super(NAME + INDEX.getAndIncrement(), Arrays.asList(topics), keyDeserializer, valDeserializer); } @Override