KAFKA-3856 (KIP-120) step two: extract internal functions from public facing TopologyBuilder class
- extract InternalTopologyBuilder from TopologyBuilder - deprecate all "leaking" methods from public TopologyBuilder API - changed TopologyDescription and all nested classed into interfaces Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]> Closes #3536 from mjsax/kafka-3856-extract-internal-topology-builder Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d798511 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d798511 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d798511 Branch: refs/heads/trunk Commit: 5d798511b12c5ef7555e4234fdd99a360176e435 Parents: fc93fb4 Author: Matthias J. Sax <[email protected]> Authored: Mon Jul 24 11:03:27 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Jul 24 11:03:27 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 4 +- .../kafka/streams/TopologyDescription.java | 152 ++ .../streams/processor/TopologyBuilder.java | 1265 +++------------ .../streams/processor/TopologyDescription.java | 476 ------ .../internals/InternalTopologyBuilder.java | 1491 ++++++++++++++++++ .../internals/StreamPartitionAssignor.java | 9 +- .../processor/internals/StreamThread.java | 7 +- .../internals/StreamsMetadataState.java | 5 +- .../apache/kafka/streams/KafkaStreamsTest.java | 31 +- .../integration/RegexSourceIntegrationTest.java | 49 +- .../streams/processor/TopologyBuilderTest.java | 10 +- .../kafka/streams/processor/TopologyTest.java | 87 +- .../internals/InternalTopologyBuilderTest.java | 709 +++++++++ .../internals/ProcessorTopologyTest.java | 34 +- .../internals/StreamPartitionAssignorTest.java | 289 +++- .../processor/internals/StreamThreadTest.java | 151 +- .../internals/StreamsMetadataStateTest.java | 8 +- .../StreamThreadStateStoreProviderTest.java | 4 +- .../kafka/test/ProcessorTopologyTestDriver.java | 7 +- 19 files changed, 3016 insertions(+), 1772 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 028713b..c7c67d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -452,7 +452,7 @@ public class KafkaStreams { GlobalStreamThread.State globalThreadState = null; final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>(); - streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); + streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology(); @@ -476,7 +476,7 @@ public class KafkaStreams { } for (int i = 0; i < threads.length; i++) { - threads[i] = new StreamThread(builder, + threads[i] = new StreamThread(builder.internalTopologyBuilder, config, clientSupplier, applicationId, http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java new file mode 100644 index 0000000..dd481ff --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.streams.processor.internals.StreamTask; + +import java.util.Set; + +/** + * A meta representation of a {@link Topology topology}. + * <p> + * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected. + * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one + * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology + * {@link Topology#addSource(String, String...) reads} from the same topic. + * <p> + * When {@link KafkaStreams#start()} is called, different sub-topologies will be constructed and executed as independent + * {@link StreamTask tasks}. + */ +public interface TopologyDescription { + /** + * A connected sub-graph of a {@link Topology}. + * <p> + * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...) + * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores} + * (i.e., if multiple processors share the same state). + */ + interface Subtopology { + /** + * Internally assigned unique ID. + * @return the ID of the sub-topology + */ + int id(); + + /** + * All nodes of this sub-topology. + * @return set of all nodes within the sub-topology + */ + Set<Node> nodes(); + } + + /** + * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String, + * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String, + * String, ProcessorSupplier)} global store}. + * Adding a global store results in adding a source node and one stateful processor node. + * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different + * global stores are not connected to each other. + * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global + * stores will never be part of any {@link Subtopology}. + */ + interface GlobalStore { + /** + * The source node reading from a "global" topic. + * @return the "global" source node + */ + Source source(); + + /** + * The processor node maintaining the global store. + * @return the "global" processor node + */ + Processor processor(); + } + + /** + * A node of a topology. Can be a source, sink, or processor node. + */ + interface Node { + /** + * The name of the node. Will never be {@code null}. + * @return the name of the node + */ + String name(); + /** + * The predecessors of this node within a sub-topology. + * Note, sources do not have any predecessors. + * Will never be {@code null}. + * @return set of all predecessors + */ + Set<Node> predecessors(); + /** + * The successor of this node within a sub-topology. + * Note, sinks do not have any successors. + * Will never be {@code null}. + * @return set of all successor + */ + Set<Node> successors(); + } + + + /** + * A source node of a topology. + */ + interface Source extends Node { + /** + * The topic names this source node is reading from. + * @return comma separated list of topic names or pattern (as String) + */ + String topics(); + } + + /** + * A processor node of a topology. + */ + interface Processor extends Node { + /** + * The names of all connected stores. + * @return set of store names + */ + Set<String> stores(); + } + + /** + * A sink node of a topology. + */ + interface Sink extends Node { + /** + * The topic name this sink node is writing to. + * @return a topic name + */ + String topic(); + } + + /** + * All sub-topologies of the represented topology. + * @return set of all sub-topologies + */ + Set<Subtopology> subtopologies(); + + /** + * All global stores of the represented topology. + * @return set of all global stores + */ + Set<GlobalStore> globalStores(); + +} +
