[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580587#comment-16580587 ]
ASF GitHub Bot commented on KAFKA-6761: --------------------------------------- guozhangwang closed pull request #5451: KAFKA-6761: Reduce streams footprint part IV add optimization URL: https://github.com/apache/kafka/pull/5451 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 0442e2bbef2..4c9ee932a1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -530,8 +530,7 @@ public synchronized Topology build() { * @return the {@link Topology} that represents the specified processing logic */ public synchronized Topology build(final Properties props) { - // the props instance will be used once optimization framework merged - internalStreamsBuilder.buildAndOptimizeTopology(); + internalStreamsBuilder.buildAndOptimizeTopology(props); return topology; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 889559144d8..e5cd0663472 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -16,12 +16,16 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode; +import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; @@ -35,13 +39,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.PriorityQueue; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import java.util.regex.Pattern; public class InternalStreamsBuilder implements InternalNameProvider { @@ -49,8 +58,9 @@ final InternalTopologyBuilder internalTopologyBuilder; private final AtomicInteger index = new AtomicInteger(0); - private final AtomicInteger nodeIdCounter = new AtomicInteger(0); - private final NodeIdComparator nodeIdComparator = new NodeIdComparator(); + private final AtomicInteger buildPriorityIndex = new AtomicInteger(0); + private final Map<StreamsGraphNode, Set<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>(); + private final Set<StreamsGraphNode> mergeNodes = new HashSet<>(); private static final String TOPOLOGY_ROOT = "root"; private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class); @@ -205,14 +215,17 @@ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeB stateUpdateSupplier); } - void addGraphNode(final StreamsGraphNode parent, final StreamsGraphNode child) { + void addGraphNode(final StreamsGraphNode parent, + final StreamsGraphNode child) { Objects.requireNonNull(parent, "parent node can't be null"); Objects.requireNonNull(child, "child node can't be null"); - parent.addChildNode(child); + parent.addChild(child); maybeAddNodeForOptimizationMetadata(child); } - void addGraphNode(final Collection<StreamsGraphNode> parents, final StreamsGraphNode child) { + + void addGraphNode(final Collection<StreamsGraphNode> parents, + final StreamsGraphNode child) { Objects.requireNonNull(parents, "parent node can't be null"); Objects.requireNonNull(child, "child node can't be null"); @@ -225,13 +238,37 @@ void addGraphNode(final Collection<StreamsGraphNode> parents, final StreamsGraph } } - void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) { - node.setId(nodeIdCounter.getAndIncrement()); + private void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) { + node.setBuildPriority(buildPriorityIndex.getAndIncrement()); + + if (node.parentNodes().isEmpty() && !node.nodeName().equals(TOPOLOGY_ROOT)) { + throw new IllegalStateException( + "Nodes should not have a null parent node. Name: " + node.nodeName() + " Type: " + + node.getClass().getSimpleName()); + } + + if (node.isKeyChangingOperation()) { + keyChangingOperationsToOptimizableRepartitionNodes.put(node, new HashSet<>()); + } else if (node instanceof OptimizableRepartitionNode) { + final StreamsGraphNode parentNode = getKeyChangingParentNode(node); + if (parentNode != null) { + keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node); + } + } else if (node.isMergeNode()) { + mergeNodes.add(node); + } } + // use this method for testing only public void buildAndOptimizeTopology() { + buildAndOptimizeTopology(null); + } + + public void buildAndOptimizeTopology(final Properties props) { + + maybePerformOptimizations(props); - final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, nodeIdComparator); + final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(StreamsGraphNode::buildPriority)); graphNodePriorityQueue.offer(root); @@ -253,17 +290,168 @@ public void buildAndOptimizeTopology() { } } + private void maybePerformOptimizations(final Properties props) { - public StreamsGraphNode root() { - return root; + if (props != null && props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE)) { + LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); + maybeOptimizeRepartitionOperations(); + } } - private static class NodeIdComparator implements Comparator<StreamsGraphNode>, Serializable { + @SuppressWarnings("unchecked") + private void maybeOptimizeRepartitionOperations() { + maybeUpdateKeyChangingRepartitionNodeMap(); - @Override - public int compare(final StreamsGraphNode o1, - final StreamsGraphNode o2) { - return o1.id().compareTo(o2.id()); + for (final Map.Entry<StreamsGraphNode, Set<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) { + + final StreamsGraphNode keyChangingNode = entry.getKey(); + + if (entry.getValue().isEmpty()) { + continue; + } + + final SerializedInternal serialized = new SerializedInternal(getRepartitionSerdes(entry.getValue())); + + final StreamsGraphNode optimizedSingleRepartition = createRepartitionNode(keyChangingNode.nodeName(), + serialized.keySerde(), + serialized.valueSerde()); + + // re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes + optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority()); + + for (final OptimizableRepartitionNode repartitionNodeToBeReplaced : entry.getValue()) { + + final StreamsGraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode)); + + if (keyChangingNodeChild == null) { + throw new StreamsException(String.format("Found a null keyChangingChild node for %s", repartitionNodeToBeReplaced)); + } + + LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced); + + // need to add children of key-changing node as children of optimized repartition + // in order to process records from re-partitioning + optimizedSingleRepartition.addChild(keyChangingNodeChild); + + LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children()); + // now remove children from key-changing node + keyChangingNode.removeChild(keyChangingNodeChild); + + // now need to get children of repartition node so we can remove repartition node + final Collection<StreamsGraphNode> repartitionNodeToBeReplacedChildren = repartitionNodeToBeReplaced.children(); + final Collection<StreamsGraphNode> parentsOfRepartitionNodeToBeReplaced = repartitionNodeToBeReplaced.parentNodes(); + + for (final StreamsGraphNode repartitionNodeToBeReplacedChild : repartitionNodeToBeReplacedChildren) { + for (final StreamsGraphNode parentNode : parentsOfRepartitionNodeToBeReplaced) { + parentNode.addChild(repartitionNodeToBeReplacedChild); + } + } + + for (final StreamsGraphNode parentNode : parentsOfRepartitionNodeToBeReplaced) { + parentNode.removeChild(repartitionNodeToBeReplaced); + } + repartitionNodeToBeReplaced.clearChildren(); + + LOG.debug("Updated node {} children {}", optimizedSingleRepartition, optimizedSingleRepartition.children()); + } + + keyChangingNode.addChild(optimizedSingleRepartition); + keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey()); + } + } + + private void maybeUpdateKeyChangingRepartitionNodeMap() { + final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers = new HashMap<>(); + for (final StreamsGraphNode mergeNode : mergeNodes) { + mergeNodesToKeyChangers.put(mergeNode, new HashSet<>()); + final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet(); + for (final StreamsGraphNode key : keys) { + final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key)); + if (maybeParentKey != null) { + mergeNodesToKeyChangers.get(mergeNode).add(key); + } + } } + + for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) { + final StreamsGraphNode mergeKey = entry.getKey(); + final Collection<StreamsGraphNode> keyChangingParents = entry.getValue(); + final Set<OptimizableRepartitionNode> repartitionNodes = new HashSet<>(); + for (final StreamsGraphNode keyChangingParent : keyChangingParents) { + repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent)); + keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent); + } + + keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes); + } + } + + @SuppressWarnings("unchecked") + private OptimizableRepartitionNode createRepartitionNode(final String name, + final Serde keySerde, + final Serde valueSerde) { + + final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); + KStreamImpl.createRepartitionedSource(this, + keySerde, + valueSerde, + name + "-optimized", + name, + repartitionNodeBuilder); + + return repartitionNodeBuilder.build(); + + } + + private StreamsGraphNode getKeyChangingParentNode(final StreamsGraphNode repartitionNode) { + final StreamsGraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation()); + + final StreamsGraphNode keyChangingNode = findParentNodeMatching(repartitionNode, StreamsGraphNode::isKeyChangingOperation); + if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) { + return keyChangingNode; + } + return null; + } + + @SuppressWarnings("unchecked") + private SerializedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) { + Serde keySerde = null; + Serde valueSerde = null; + + for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) { + if (keySerde == null && repartitionNode.keySerde() != null) { + keySerde = repartitionNode.keySerde(); + } + + if (valueSerde == null && repartitionNode.valueSerde() != null) { + valueSerde = repartitionNode.valueSerde(); + } + + if (keySerde != null && valueSerde != null) { + break; + } + } + + return new SerializedInternal(Serialized.with(keySerde, valueSerde)); + } + + private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode startSeekingNode, + final Predicate<StreamsGraphNode> parentNodePredicate) { + if (parentNodePredicate.test(startSeekingNode)) { + return startSeekingNode; + } + StreamsGraphNode foundParentNode = null; + + for (final StreamsGraphNode parentNode : startSeekingNode.parentNodes()) { + if (parentNodePredicate.test(parentNode)) { + return parentNode; + } + foundParentNode = findParentNodeMatching(parentNode, parentNodePredicate); + } + return foundParentNode; + } + + public StreamsGraphNode root() { + return root; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index becb03db24c..9b8afcd744b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -219,6 +219,7 @@ final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + mapValuesProcessorNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode); return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode); @@ -274,6 +275,7 @@ public void print(final Printed<K, V> printed) { final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + flatMapValuesNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode); return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode); @@ -343,7 +345,7 @@ public void print(final Printed<K, V> printed) { processorParameters, requireRepartitioning); - + mergeNode.setMergeNode(true); builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode); return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode); } @@ -491,6 +493,7 @@ private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInt stateStoreNames, null, repartitionRequired); + transformNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, transformNode); return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode); @@ -529,7 +532,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl joiner, windows, joined, - new KStreamImplJoin(false, false, this.streamsGraphNode)); + new KStreamImplJoin(false, false)); } @@ -545,7 +548,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> joined) { - return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true, this.streamsGraphNode)); + return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true)); } private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other, @@ -656,7 +659,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl joiner, windows, joined, - new KStreamImplJoin(true, false, this.streamsGraphNode) + new KStreamImplJoin(true, false) ); } @@ -840,14 +843,12 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl private final boolean leftOuter; private final boolean rightOuter; - private final StreamsGraphNode parentGraphNode; + KStreamImplJoin(final boolean leftOuter, - final boolean rightOuter, - final StreamsGraphNode parentGraphNode) { + final boolean rightOuter) { this.leftOuter = leftOuter; this.rightOuter = rightOuter; - this.parentGraphNode = parentGraphNode; } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java index 7198df09829..4d81b1f699c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java @@ -46,6 +46,14 @@ } + public Serde<K> keySerde() { + return keySerde; + } + + public Serde<V> valueSerde() { + return valueSerde; + } + @Override Serializer<V> getValueSerializer() { return valueSerde != null ? valueSerde.serializer() : null; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java index 902a4e97e2a..fac5923f3f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java @@ -31,7 +31,9 @@ private final String nodeName; private final boolean repartitionRequired; private boolean keyChangingOperation; - private Integer id; + private boolean valueChangingOperation; + private boolean mergeNode; + private Integer buildPriority; private boolean hasWrittenToTopology = false; public StreamsGraphNode(final String nodeName, @@ -44,7 +46,7 @@ public StreamsGraphNode(final String nodeName, return parentNodes; } - public String[] parentNodeNames() { + String[] parentNodeNames() { final String[] parentNames = new String[parentNodes.size()]; int index = 0; for (final StreamsGraphNode parentNode : parentNodes) { @@ -62,17 +64,24 @@ public boolean allParentsWrittenToTopology() { return true; } - public void addParentNode(final StreamsGraphNode parentNode) { - parentNodes.add(parentNode); - } - public Collection<StreamsGraphNode> children() { return new LinkedHashSet<>(childNodes); } - public void addChildNode(final StreamsGraphNode childNode) { + public void clearChildren() { + for (final StreamsGraphNode childNode : childNodes) { + childNode.parentNodes.remove(this); + } + childNodes.clear(); + } + + public boolean removeChild(final StreamsGraphNode child) { + return childNodes.remove(child) && child.parentNodes.remove(this); + } + + public void addChild(final StreamsGraphNode childNode) { this.childNodes.add(childNode); - childNode.addParentNode(this); + childNode.parentNodes.add(this); } public String nodeName() { @@ -87,16 +96,32 @@ public boolean isKeyChangingOperation() { return keyChangingOperation; } + public boolean isValueChangingOperation() { + return valueChangingOperation; + } + + public boolean isMergeNode() { + return mergeNode; + } + + public void setMergeNode(final boolean mergeNode) { + this.mergeNode = mergeNode; + } + + public void setValueChangingOperation(final boolean valueChangingOperation) { + this.valueChangingOperation = valueChangingOperation; + } + public void keyChangingOperation(final boolean keyChangingOperation) { this.keyChangingOperation = keyChangingOperation; } - public void setId(final int id) { - this.id = id; + public void setBuildPriority(final int buildPriority) { + this.buildPriority = buildPriority; } - public Integer id() { - return this.id; + public Integer buildPriority() { + return this.buildPriority; } public abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder); @@ -114,7 +139,7 @@ public String toString() { final String[] parentNames = parentNodeNames(); return "StreamsGraphNode{" + "nodeName='" + nodeName + '\'' + - ", id=" + id + + ", buildPriority=" + buildPriority + " parentNodes=" + Arrays.toString(parentNames) + '}'; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java new file mode 100644 index 00000000000..e192c70b81b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import kafka.utils.MockTime; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@Category({IntegrationTest.class}) +public class RepartitionOptimizingIntegrationTest { + + private static final int NUM_BROKERS = 1; + private static final String INPUT_TOPIC = "input"; + private static final String COUNT_TOPIC = "outputTopic_0"; + private static final String AGGREGATION_TOPIC = "outputTopic_1"; + private static final String REDUCE_TOPIC = "outputTopic_2"; + private static final String JOINED_TOPIC = "joinedOutputTopic"; + + private static final int ONE_REPARTITION_TOPIC = 1; + private static final int FOUR_REPARTITION_TOPICS = 4; + + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + + private Properties streamsConfiguration; + + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; + + @Before + public void setUp() throws Exception { + final Properties props = new Properties(); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); + props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + "maybe-optimized-test-app", + CLUSTER.bootstrapServers(), + Serdes.String().getClass().getName(), + Serdes.String().getClass().getName(), + props); + + CLUSTER.createTopics(INPUT_TOPIC, + COUNT_TOPIC, + AGGREGATION_TOPIC, + REDUCE_TOPIC, + JOINED_TOPIC); + + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @After + public void tearDown() throws Exception { + CLUSTER.deleteAllTopicsAndWait(30_000L); + } + + @Test + public void shouldSendCorrectRecords_OPTIMIZED() throws Exception { + runIntegrationTest(StreamsConfig.OPTIMIZE, + ONE_REPARTITION_TOPIC); + } + + @Test + public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception { + runIntegrationTest(StreamsConfig.NO_OPTIMIZATION, + FOUR_REPARTITION_TOPICS); + } + + + private void runIntegrationTest(final String optimizationConfig, + final int expectedNumberRepartitionTopics) throws Exception { + + final Initializer<Integer> initializer = () -> 0; + final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length(); + + final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2; + + final List<String> processorValueCollector = new ArrayList<>(); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> sourceStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + final KStream<String, String> mappedStream = sourceStream.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v)); + + mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault())) + .process(() -> new SimpleProcessor(processorValueCollector)); + + final KStream<String, Long> countStream = mappedStream.groupByKey().count(Materialized.with(Serdes.String(), Serdes.Long())).toStream(); + + countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); + + mappedStream.groupByKey().aggregate(initializer, + aggregator, + Materialized.with(Serdes.String(), Serdes.Integer())) + .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer())); + + // adding operators for case where the repartition node is further downstream + mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey() + .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())) + .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + mappedStream.filter((k, v) -> k.equals("A")) + .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), + JoinWindows.of(5000), + Joined.with(Serdes.String(), Serdes.String(), Serdes.Long())) + .to(JOINED_TOPIC); + + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + + IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, getKeyValues(), producerConfig, mockTime); + + final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); + final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, IntegerDeserializer.class); + final Properties consumerConfig3 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) { + assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString); + } else { + assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString); + } + + + /* + confirming number of expected repartition topics here + */ + assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString)); + + final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + streams.start(); + + final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L)); + final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size()); + + final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9)); + final List<KeyValue<String, Integer>> receivedAggKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues.size()); + + final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz")); + final List<KeyValue<String, Integer>> receivedReduceKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedAggKeyValues.size()); + + final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3")); + final List<KeyValue<String, Integer>> receivedJoinKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues.size()); + + + final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ"); + + assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues)); + assertThat(receivedAggKeyValues, equalTo(expectedAggKeyValues)); + assertThat(receivedReduceKeyValues, equalTo(expectedReduceKeyValues)); + assertThat(receivedJoinKeyValues, equalTo(expectedJoinKeyValues)); + + assertThat(3, equalTo(processorValueCollector.size())); + assertThat(processorValueCollector, equalTo(expectedCollectedProcessorValues)); + + streams.close(5, TimeUnit.SECONDS); + } + + + private int getCountOfRepartitionTopicsFound(final String topologyString) { + final Matcher matcher = repartitionTopicPattern.matcher(topologyString); + final List<String> repartitionTopicsFound = new ArrayList<>(); + while (matcher.find()) { + repartitionTopicsFound.add(matcher.group()); + } + return repartitionTopicsFound.size(); + } + + + private List<KeyValue<String, String>> getKeyValues() { + final List<KeyValue<String, String>> keyValueList = new ArrayList<>(); + final String[] keys = new String[]{"a", "b", "c"}; + final String[] values = new String[]{"foo", "bar", "baz"}; + for (final String key : keys) { + for (final String value : values) { + keyValueList.add(KeyValue.pair(key, value)); + } + } + return keyValueList; + } + + + private static class SimpleProcessor extends AbstractProcessor<String, String> { + + final List<String> valueList; + + SimpleProcessor(final List<String> valueList) { + this.valueList = valueList; + } + + @Override + public void process(final String key, final String value) { + valueList.add(value); + } + } + + + private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-MAP-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000003\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" + + " --> KSTREAM-SINK-0000000039\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" + + " --> KSTREAM-PROCESSOR-0000000004\n" + + " <-- KSTREAM-FILTER-0000000002\n" + + " Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-MAPVALUES-0000000003\n" + + " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-MAP-0000000001-optimized-repartition)\n" + + " <-- KSTREAM-FILTER-0000000040\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-MAP-0000000001-optimized-repartition])\n" + + " --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n" + + " Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" + + " --> KTABLE-TOSTREAM-0000000011\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" + + " --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" + + " <-- KSTREAM-AGGREGATE-0000000007\n" + + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000021\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" + + " --> KSTREAM-WINDOWED-0000000033\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" + + " --> KSTREAM-REDUCE-0000000023\n" + + " <-- KSTREAM-FILTER-0000000020\n" + + " Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" + + " --> KSTREAM-JOINTHIS-0000000035\n" + + " <-- KSTREAM-FILTER-0000000029\n" + + " Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" + + " --> KSTREAM-JOINOTHER-0000000036\n" + + " <-- KTABLE-TOSTREAM-0000000011\n" + + " Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" + + " --> KTABLE-TOSTREAM-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" + + " --> KSTREAM-MERGE-0000000037\n" + + " <-- KSTREAM-WINDOWED-0000000034\n" + + " Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" + + " --> KSTREAM-MERGE-0000000037\n" + + " <-- KSTREAM-WINDOWED-0000000033\n" + + " Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" + + " --> KTABLE-TOSTREAM-0000000027\n" + + " <-- KSTREAM-PEEK-0000000021\n" + + " Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" + + " --> KSTREAM-SINK-0000000038\n" + + " <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" + + " Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" + + " --> KSTREAM-SINK-0000000019\n" + + " <-- KSTREAM-AGGREGATE-0000000014\n" + + " Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" + + " --> KSTREAM-SINK-0000000028\n" + + " <-- KSTREAM-REDUCE-0000000023\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" + + " <-- KTABLE-TOSTREAM-0000000011\n" + + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" + + " <-- KTABLE-TOSTREAM-0000000018\n" + + " Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" + + " <-- KTABLE-TOSTREAM-0000000027\n" + + " Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n" + + " <-- KSTREAM-MERGE-0000000037\n\n"; + + + private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-MAP-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000021\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000003\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000031\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000025\n" + + " <-- KSTREAM-FILTER-0000000020\n" + + " Processor: KSTREAM-FILTER-0000000009 (stores: [])\n" + + " --> KSTREAM-SINK-0000000008\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000016 (stores: [])\n" + + " --> KSTREAM-SINK-0000000015\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000025 (stores: [])\n" + + " --> KSTREAM-SINK-0000000024\n" + + " <-- KSTREAM-PEEK-0000000021\n" + + " Processor: KSTREAM-FILTER-0000000031 (stores: [])\n" + + " --> KSTREAM-SINK-0000000030\n" + + " <-- KSTREAM-FILTER-0000000029\n" + + " Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" + + " --> KSTREAM-PROCESSOR-0000000004\n" + + " <-- KSTREAM-FILTER-0000000002\n" + + " Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-MAPVALUES-0000000003\n" + + " Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n" + + " <-- KSTREAM-FILTER-0000000009\n" + + " Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n" + + " <-- KSTREAM-FILTER-0000000016\n" + + " Sink: KSTREAM-SINK-0000000024 (topic: KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n" + + " <-- KSTREAM-FILTER-0000000025\n" + + " Sink: KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n" + + " <-- KSTREAM-FILTER-0000000031\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000007\n" + + " Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" + + " --> KTABLE-TOSTREAM-0000000011\n" + + " <-- KSTREAM-SOURCE-0000000010\n" + + " Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" + + " --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" + + " <-- KSTREAM-AGGREGATE-0000000007\n" + + " Source: KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n" + + " --> KSTREAM-WINDOWED-0000000033\n" + + " Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" + + " --> KSTREAM-JOINTHIS-0000000035\n" + + " <-- KSTREAM-SOURCE-0000000032\n" + + " Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" + + " --> KSTREAM-JOINOTHER-0000000036\n" + + " <-- KTABLE-TOSTREAM-0000000011\n" + + " Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" + + " --> KSTREAM-MERGE-0000000037\n" + + " <-- KSTREAM-WINDOWED-0000000034\n" + + " Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" + + " --> KSTREAM-MERGE-0000000037\n" + + " <-- KSTREAM-WINDOWED-0000000033\n" + + " Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" + + " --> KSTREAM-SINK-0000000038\n" + + " <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" + + " Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" + + " <-- KTABLE-TOSTREAM-0000000011\n" + + " Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n" + + " <-- KSTREAM-MERGE-0000000037\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000014\n" + + " Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" + + " --> KTABLE-TOSTREAM-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000017\n" + + " Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" + + " --> KSTREAM-SINK-0000000019\n" + + " <-- KSTREAM-AGGREGATE-0000000014\n" + + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" + + " <-- KTABLE-TOSTREAM-0000000018\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000026 (topics: [KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n" + + " --> KSTREAM-REDUCE-0000000023\n" + + " Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" + + " --> KTABLE-TOSTREAM-0000000027\n" + + " <-- KSTREAM-SOURCE-0000000026\n" + + " Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" + + " --> KSTREAM-SINK-0000000028\n" + + " <-- KSTREAM-REDUCE-0000000023\n" + + " Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" + + " <-- KTABLE-TOSTREAM-0000000027\n\n"; + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java new file mode 100644 index 00000000000..200062e0fb9 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + + +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import kafka.utils.MockTime; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@Category({IntegrationTest.class}) +public class RepartitionWithMergeOptimizingIntegrationTest { + + private static final int NUM_BROKERS = 1; + private static final String INPUT_A_TOPIC = "inputA"; + private static final String INPUT_B_TOPIC = "inputB"; + private static final String COUNT_TOPIC = "outputTopic_0"; + private static final String COUNT_STRING_TOPIC = "outputTopic_1"; + + + private static final int ONE_REPARTITION_TOPIC = 1; + private static final int TWO_REPARTITION_TOPICS = 2; + + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + + private Properties streamsConfiguration; + + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; + + @Before + public void setUp() throws Exception { + final Properties props = new Properties(); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); + props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + "maybe-optimized-with-merge-test-app", + CLUSTER.bootstrapServers(), + Serdes.String().getClass().getName(), + Serdes.String().getClass().getName(), + props); + + CLUSTER.createTopics(COUNT_TOPIC, + COUNT_STRING_TOPIC, + INPUT_A_TOPIC, + INPUT_B_TOPIC); + + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @After + public void tearDown() throws Exception { + CLUSTER.deleteAllTopicsAndWait(30_000L); + } + + @Test + public void shouldSendCorrectRecords_OPTIMIZED() throws Exception { + runIntegrationTest(StreamsConfig.OPTIMIZE, + ONE_REPARTITION_TOPIC); + } + + @Test + public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception { + runIntegrationTest(StreamsConfig.NO_OPTIMIZATION, + TWO_REPARTITION_TOPICS); + } + + + private void runIntegrationTest(final String optimizationConfig, + final int expectedNumberRepartitionTopics) throws Exception { + + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> sourceAStream = builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + final KStream<String, String> sourceBStream = builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); + + final KStream<String, String> mappedAStream = sourceAStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v)); + final KStream<String, String> mappedBStream = sourceBStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v)); + + final KStream<String, String> mergedStream = mappedAStream.merge(mappedBStream); + + mergedStream.groupByKey().count().toStream().to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); + mergedStream.groupByKey().count().toStream().mapValues(v -> v.toString()).to(COUNT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + + IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, getKeyValues(), producerConfig, mockTime); + IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, getKeyValues(), producerConfig, mockTime); + + final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); + final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + System.out.println(topologyString); + + if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) { + assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString); + } else { + assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString); + } + + + /* + confirming number of expected repartition topics here + */ + assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString)); + + final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); + streams.start(); + + final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L)); + final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size()); + + final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6")); + final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues.size()); + + assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues)); + assertThat(receivedCountStringKeyValues, equalTo(expectedStringCountKeyValues)); + + streams.close(5, TimeUnit.SECONDS); + } + + + private int getCountOfRepartitionTopicsFound(final String topologyString) { + final Matcher matcher = repartitionTopicPattern.matcher(topologyString); + final List<String> repartitionTopicsFound = new ArrayList<>(); + while (matcher.find()) { + repartitionTopicsFound.add(matcher.group()); + } + return repartitionTopicsFound.size(); + } + + + private List<KeyValue<String, String>> getKeyValues() { + final List<KeyValue<String, String>> keyValueList = new ArrayList<>(); + final String[] keys = new String[]{"X", "Y", "Z"}; + final String[] values = new String[]{"A:foo", "B:foo", "C:foo"}; + for (final String key : keys) { + for (final String value : values) { + keyValueList.add(KeyValue.pair(key, value)); + } + } + return keyValueList; + } + + + + private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n" + + " --> KSTREAM-MAP-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n" + + " --> KSTREAM-MAP-0000000003\n" + + " Processor: KSTREAM-MAP-0000000002 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KSTREAM-MERGE-0000000004 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000021\n" + + " <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n" + + " Processor: KSTREAM-FILTER-0000000021 (stores: [])\n" + + " --> KSTREAM-SINK-0000000020\n" + + " <-- KSTREAM-MERGE-0000000004\n" + + " Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-MERGE-0000000004-optimized-repartition)\n" + + " <-- KSTREAM-FILTER-0000000021\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-MERGE-0000000004-optimized-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n" + + " Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n" + + " --> KTABLE-TOSTREAM-0000000017\n" + + " <-- KSTREAM-SOURCE-0000000022\n" + + " Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n" + + " --> KTABLE-TOSTREAM-0000000010\n" + + " <-- KSTREAM-SOURCE-0000000022\n" + + " Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000018\n" + + " <-- KSTREAM-AGGREGATE-0000000013\n" + + " Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n" + + " --> KSTREAM-SINK-0000000019\n" + + " <-- KTABLE-TOSTREAM-0000000017\n" + + " Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n" + + " --> KSTREAM-SINK-0000000011\n" + + " <-- KSTREAM-AGGREGATE-0000000006\n" + + " Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n" + + " <-- KTABLE-TOSTREAM-0000000010\n" + + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" + + " <-- KSTREAM-MAPVALUES-0000000018\n\n"; + + + private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n" + + " --> KSTREAM-MAP-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n" + + " --> KSTREAM-MAP-0000000003\n" + + " Processor: KSTREAM-MAP-0000000002 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-MAP-0000000003 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KSTREAM-MERGE-0000000004 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n" + + " <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n" + + " Processor: KSTREAM-FILTER-0000000008 (stores: [])\n" + + " --> KSTREAM-SINK-0000000007\n" + + " <-- KSTREAM-MERGE-0000000004\n" + + " Processor: KSTREAM-FILTER-0000000015 (stores: [])\n" + + " --> KSTREAM-SINK-0000000014\n" + + " <-- KSTREAM-MERGE-0000000004\n" + + " Sink: KSTREAM-SINK-0000000007 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n" + + " <-- KSTREAM-FILTER-0000000008\n" + + " Sink: KSTREAM-SINK-0000000014 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n" + + " <-- KSTREAM-FILTER-0000000015\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000009 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000006\n" + + " Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n" + + " --> KTABLE-TOSTREAM-0000000010\n" + + " <-- KSTREAM-SOURCE-0000000009\n" + + " Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n" + + " --> KSTREAM-SINK-0000000011\n" + + " <-- KSTREAM-AGGREGATE-0000000006\n" + + " Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n" + + " <-- KTABLE-TOSTREAM-0000000010\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000016 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000013\n" + + " Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n" + + " --> KTABLE-TOSTREAM-0000000017\n" + + " <-- KSTREAM-SOURCE-0000000016\n" + + " Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000018\n" + + " <-- KSTREAM-AGGREGATE-0000000013\n" + + " Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n" + + " --> KSTREAM-SINK-0000000019\n" + + " <-- KTABLE-TOSTREAM-0000000017\n" + + " Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" + + " <-- KSTREAM-MAPVALUES-0000000018\n\n"; + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 2bddf4110e0..d65f27e2b19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -18,15 +18,121 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import static org.junit.Assert.assertEquals; public class StreamsGraphTest { + + final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + + + // Test builds topology in succesive manner but only graph node not yet processed written to topology + + @Test + public void shouldBeAbleToBuildTopologyIncrementally() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> stream = builder.stream("topic"); + final KStream<String, String> streamII = builder.stream("other-topic"); + final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2; + + + final KStream<String, String> joinedStream = stream.join(streamII, valueJoiner, JoinWindows.of(5000)); + + // build step one + assertEquals(expectedJoinedTopology, builder.build().describe().toString()); + + final KStream<String, String> filteredJoinStream = joinedStream.filter((k, v) -> v.equals("foo")); + // build step two + assertEquals(expectedJoinedFilteredTopology, builder.build().describe().toString()); + + filteredJoinStream.mapValues(v -> v + "some value").to("output-topic"); + // build step three + assertEquals(expectedFullTopology, builder.build().describe().toString()); + + } + + @Test + public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() { + + final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE); + final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION); + + assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString()); + assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); + } + + // no need to optimize as user has already performed the repartitioning manually + @Test + public void shouldNotOptimizeWhenAThroughOperationIsDone() { + + final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE); + final Topology noOptimziation = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION); + + assertEquals(attemptedOptimize.describe().toString(), noOptimziation.describe().toString()); + assertEquals(0, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(0, getCountOfRepartitionTopicsFound(noOptimziation.describe().toString())); + + } + + private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) { + + final StreamsBuilder builder = new StreamsBuilder(); + final Properties properties = new Properties(); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig); + + final KStream<String, String> inputStream = builder.stream("input"); + final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v); + + mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output"); + mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("windowed-output"); + + return builder.build(properties); + + } + + private Topology getTopologyWithThroughOperation(final String optimizeConfig) { + + final StreamsBuilder builder = new StreamsBuilder(); + final Properties properties = new Properties(); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig); + + final KStream<String, String> inputStream = builder.stream("input"); + final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic"); + + mappedKeyStream.groupByKey().count().toStream().to("output"); + mappedKeyStream.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("windowed-output"); + + return builder.build(properties); + + } + + private int getCountOfRepartitionTopicsFound(final String topologyString) { + final Matcher matcher = repartitionTopicPattern.matcher(topologyString); + final List<String> repartitionTopicsFound = new ArrayList<>(); + while (matcher.find()) { + repartitionTopicsFound.add(matcher.group()); + } + return repartitionTopicsFound.size(); + } + private String expectedJoinedTopology = "Topologies:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" @@ -104,31 +210,4 @@ + " Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n" + " <-- KSTREAM-MAPVALUES-0000000008\n\n"; - // Test builds topology in succesive manner but only graph node not yet processed written to topology - - @Test - public void shouldBeAbleToBuildTopologyIncrementally() { - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream<String, String> stream = builder.stream("topic"); - final KStream<String, String> streamII = builder.stream("other-topic"); - final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2; - - - final KStream<String, String> joinedStream = stream.join(streamII, valueJoiner, JoinWindows.of(5000)); - - // build step one - assertEquals(expectedJoinedTopology, builder.build().describe().toString()); - - final KStream<String, String> filteredJoinStream = joinedStream.filter((k, v) -> v.equals("foo")); - // build step two - assertEquals(expectedJoinedFilteredTopology, builder.build().describe().toString()); - - filteredJoinStream.mapValues(v -> v + "some value").to("output-topic"); - // build step three - assertEquals(expectedFullTopology, builder.build().describe().toString()); - - - } - } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > ------------------------------ > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)