This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 369d89f MINOR: Move KTable source topic for changelog to optimization framework (#6500) 369d89f is described below commit 369d89f2080586773b5a3cc432cbcf655aa1f625 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Fri Mar 29 17:16:56 2019 -0400 MINOR: Move KTable source topic for changelog to optimization framework (#6500) Since we've added Kafka Streams optimizations in 2.1 we need to move the optimization for source KTable nodes (use source topic as changelog) to the optimization framework. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../kstream/internals/InternalStreamsBuilder.java | 9 ++++++ .../kstream/internals/graph/TableSourceNode.java | 12 +++++++- .../internals/InternalTopologyBuilder.java | 36 +--------------------- .../apache/kafka/streams/StreamsBuilderTest.java | 2 +- .../integration/RestoreIntegrationTest.java | 2 +- 5 files changed, 23 insertions(+), 38 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index c06b988..920f213 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -61,6 +61,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { private final AtomicInteger buildPriorityIndex = new AtomicInteger(0); private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>(); private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>(); + private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>(); private static final String TOPOLOGY_ROOT = "root"; private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class); @@ -254,6 +255,8 @@ public class InternalStreamsBuilder implements InternalNameProvider { } } else if (node.isMergeNode()) { mergeNodes.add(node); + } else if (node instanceof TableSourceNode) { + tableSourceNodes.add(node); } } @@ -292,10 +295,16 @@ public class InternalStreamsBuilder implements InternalNameProvider { if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); + optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); } } + private void optimizeKTableSourceTopics() { + LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs "); + tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true)); + } + @SuppressWarnings("unchecked") private void maybeOptimizeRepartitionOperations() { maybeUpdateKeyChangingRepartitionNodeMap(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index 53061dc..fa979b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -38,6 +38,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> { private final ProcessorParameters<K, V> processorParameters; private final String sourceName; private final boolean isGlobalKTable; + private boolean shouldReuseSourceTopicForChangelog = false; private TableSourceNode(final String nodeName, final String sourceName, @@ -57,6 +58,11 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> { this.materializedInternal = materializedInternal; } + + public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicForChangelog) { + this.shouldReuseSourceTopicForChangelog = shouldReuseSourceTopicForChangelog; + } + @Override public String toString() { return "TableSourceNode{" + @@ -104,7 +110,11 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> { final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier(); if (ktableSource.queryableName() != null) { topologyBuilder.addStateStore(storeBuilder, nodeName()); - topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName); + + if (shouldReuseSourceTopicForChangelog) { + storeBuilder.withLoggingDisabled(); + topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 334adce..792df53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -123,9 +123,6 @@ public class InternalTopologyBuilder { private Map<Integer, Set<String>> nodeGroups = null; - // TODO: this is only temporary for 2.0 and should be removed - private final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>(); - public static class StateStoreFactory { private final StoreBuilder builder; private final Set<String> users = new HashSet<>(); @@ -359,10 +356,6 @@ public class InternalTopologyBuilder { globalStateStores.put(storeBuilder.name(), storeBuilder.build()); } - // adjust the topology if optimization is turned on. - // TODO: to be removed post 2.0 - adjust(config); - return this; } @@ -606,7 +599,7 @@ public class InternalTopologyBuilder { nodeGroups = null; } - private void connectSourceStoreAndTopic(final String sourceStoreName, + public void connectSourceStoreAndTopic(final String sourceStoreName, final String topic) { if (storeToChangelogTopic.containsKey(sourceStoreName)) { throw new TopologyException("Source store " + sourceStoreName + " is already added."); @@ -614,14 +607,6 @@ public class InternalTopologyBuilder { storeToChangelogTopic.put(sourceStoreName, topic); } - public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder, - final String topic) { - if (storeToSourceChangelogTopic.containsKey(storeBuilder)) { - throw new TopologyException("Source store " + storeBuilder.name() + " is already used."); - } - storeToSourceChangelogTopic.put(storeBuilder, topic); - } - public final void addInternalTopic(final String topicName) { Objects.requireNonNull(topicName, "topicName can't be null"); internalTopicNames.add(topicName); @@ -1071,25 +1056,6 @@ public class InternalTopologyBuilder { return Collections.unmodifiableMap(topicGroups); } - // Adjust the generated topology based on the configs. - // Not exposed as public API and should be removed post 2.0 - private void adjust(final StreamsConfig config) { - final boolean enableOptimization20 = - config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE); - - if (enableOptimization20) { - for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) { - final StoreBuilder storeBuilder = entry.getKey(); - final String topicName = entry.getValue(); - - // update store map to disable logging for this store - storeBuilder.withLoggingDisabled(); - addStateStore(storeBuilder, true); - connectSourceStoreAndTopic(storeBuilder.name(), topicName); - } - } - } - private void setRegexMatchedTopicsToSourceNodes() { if (subscriptionUpdates.hasUpdates()) { for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b51eac8..f140546 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -349,9 +349,9 @@ public class StreamsBuilderTest { public void shouldReuseSourceTopicAsChangelogsWithOptimization20() { final String topic = "topic"; builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")); - final Topology topology = builder.build(); final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + final Topology topology = builder.build(props); final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 830514b..f21dbfc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -146,7 +146,7 @@ public class RestoreIntegrationTest { } }); - kafkaStreams = new KafkaStreams(builder.build(), props); + kafkaStreams = new KafkaStreams(builder.build(props), props); kafkaStreams.setStateListener((newState, oldState) -> { if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { startupLatch.countDown();