[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824256#comment-16824256 ] Matthias J. Sax commented on KAFKA-6761: I am just wondering why this ticket was closed. There are two sub-tasks that are not resolved yet. > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635578#comment-16635578 ] Dong Lin commented on KAFKA-6761: - Closing this Jira since all 6 PRs have been merged/closed > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580587#comment-16580587 ] ASF GitHub Bot commented on KAFKA-6761: --- guozhangwang closed pull request #5451: KAFKA-6761: Reduce streams footprint part IV add optimization URL: https://github.com/apache/kafka/pull/5451 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 0442e2bbef2..4c9ee932a1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -530,8 +530,7 @@ public synchronized Topology build() { * @return the {@link Topology} that represents the specified processing logic */ public synchronized Topology build(final Properties props) { -// the props instance will be used once optimization framework merged -internalStreamsBuilder.buildAndOptimizeTopology(); +internalStreamsBuilder.buildAndOptimizeTopology(props); return topology; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 889559144d8..e5cd0663472 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -16,12 +16,16 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode; +import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; @@ -35,13 +39,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.PriorityQueue; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import java.util.regex.Pattern; public class InternalStreamsBuilder implements InternalNameProvider { @@ -49,8 +58,9 @@ final InternalTopologyBuilder internalTopologyBuilder; private final AtomicInteger index = new AtomicInteger(0); -private final AtomicInteger nodeIdCounter = new AtomicInteger(0); -private final NodeIdComparator nodeIdComparator = new NodeIdComparator(); +private final AtomicInteger buildPriorityIndex = new AtomicInteger(0); +private final Map> keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>(); +private final Set mergeNodes = new HashSet<>(); private static final String TOPOLOGY_ROOT = "root"; private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class); @@ -205,14 +215,17 @@ public synchronized void addGlobalStore(final StoreBuilder storeB stateUpdateSupplier); } -void addGraphNode(final StreamsGraphNode parent, final StreamsGraphNode child) { +void addGraphNode(final StreamsGraphNode parent, + final StreamsGraphNode child) { Objects.requireNonNull(parent, "parent node can't be null"); Objects.requireNonNull(child, "child node can't be null"); -parent.addChildNode(child); +parent.addChild(child); maybeAddNodeForOptimizationMetadata(child); } -void addGraphNode(final Collection parents, final StreamsGraphNode child) { + +void addGraphNode(final Collection parents, + final StreamsGraphNode child) { Objects.requireNonNull(parents, "parent node can't be null"); Objects.requireNonNull(child, "child node can't be null"); @@ -225,13 +238,37 @@
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573992#comment-16573992 ] ASF GitHub Bot commented on KAFKA-6761: --- guozhangwang closed pull request #5453: MINOR: Follow up for KAFKA-6761 graph should add stores for consistency URL: https://github.com/apache/kafka/pull/5453 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index b502153bb52..788e0cb32a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -21,7 +21,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; +import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode; @@ -167,7 +169,7 @@ public String newStoreName(final String prefix) { } public synchronized void addStateStore(final StoreBuilder builder) { -internalTopologyBuilder.addStateStore(builder); +addGraphNode(root, new StateStoreNode(builder)); } public synchronized void addGlobalStore(final StoreBuilder storeBuilder, @@ -176,16 +178,15 @@ public synchronized void addGlobalStore(final StoreBuilder storeB final ConsumedInternal consumed, final String processorName, final ProcessorSupplier stateUpdateSupplier) { -// explicitly disable logging for global stores -storeBuilder.withLoggingDisabled(); -internalTopologyBuilder.addGlobalStore(storeBuilder, - sourceName, - consumed.timestampExtractor(), - consumed.keyDeserializer(), - consumed.valueDeserializer(), - topic, - processorName, - stateUpdateSupplier); + +final StreamsGraphNode globalStoreNode = new GlobalStoreNode(storeBuilder, + sourceName, + topic, + consumed, + processorName, + stateUpdateSupplier); + +addGraphNode(root, globalStoreNode); } public synchronized void addGlobalStore(final StoreBuilder storeBuilder, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java new file mode 100644 index 000..a844de6f839 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.graph; + +import
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567498#comment-16567498 ] ASF GitHub Bot commented on KAFKA-6761: --- bbejeck opened a new pull request #5453: MINOR: Follow up for KAFKA-6761 graph should add stores for consistency URL: https://github.com/apache/kafka/pull/5453 While working on 4th PR, I noticed that I had missed adding stores via the graph vs. directly via the `InternalStreamsBuilder`. Probably ok to do so, but we should be consistent. For testing, I ran the existing tests in streams. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567441#comment-16567441 ] ASF GitHub Bot commented on KAFKA-6761: --- bbejeck opened a new pull request #5451: KAFKA-6761: Reduce streams footprint part IV add optimization URL: https://github.com/apache/kafka/pull/5451 This PR adds the optimization of eliminating multiple repartition topics when the `KStream` resulting from a key-changing operation executes other methods using the new key and reduces the repartition topics to one. Note that this PR leaves in place the optimization for re-using a source topic as a changelog topic for source `KTable` instances. I'll have another follow-up PR to move the source topic optimization to a method within `InternalStreamsBuilder` so it can be performed in the same area of the code. Additionally, the current value of `StreamsConfig.OPTIMIZE` is `all` and we'll need to have another KIP to change the value to `2.1`. An integration test `RepartitionOptimizingIntegrationTest` which asserts the same results for an optimized topology with one repartition topic as the un-optimized version with four repartition topics. More tests will be added, but I wanted to get reviews on the approach now. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509742#comment-16509742 ] ASF GitHub Bot commented on KAFKA-6761: --- bbejeck opened a new pull request #5201: KAFKA-6761: Construct Physical Plan using Graph, Reduce streams footprint part III URL: https://github.com/apache/kafka/pull/5201 Sorry for the massive PR, but at this point it's very difficult to break up into smaller parts now that we are building the logical and physical plan. It's worth noting at them moment this PR does not include optimizations, for two reasons 1. We need a solution to providing properties for the topology build process. 2. Some of the repartition optimizations need more work The specific changes in this PR from the second PR include 1. Moved all graph objects into a separate package. This requires the graph objects to have public access but after looking at the code for some time it's worth the trade-off to have a cleaner code-base 2. Changed the types of graph nodes to names conveying more context 2. Build the entire physical plan from the graph, after `StreamsBuilder.build()` is called. 3. I'm currently working on applying optimizations and they will be applied soon. Those optimizations will include: 1. Re-using source topics as changelogs for KTables 2. Re-using sink topics as changelogs for statestore that output directly to sinks 3. Automatically repartition for key-changing operations followed by other DSL operations that would repartition on their own, reducing repartitions from N to 1. Other changes are addressed directly as review comments on the PR. Testing consists of using all existing streams tests to validate building the physical plan with graph ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.1.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468244#comment-16468244 ] ASF GitHub Bot commented on KAFKA-6761: --- bbejeck opened a new pull request #4983: KAFKA-6761 [WIP]- reduce streams footprint part II URL: https://github.com/apache/kafka/pull/4983 This version is a WIP and intentionally leaves out some additional required changes to keep the reviewing effort more manageable. This version of the process includes 1. Cleaning up the graph objects to reduce the number of parameters and make the naming conventions more clear. 2. Intercepting all calls to the `InternalToplogyBuilder` and capturing all details required for possible optimizations and building the final topology. This PR does not include writing out the current physical plan, so no tests included. The next PR will include additional changes to building the graph and writing the topology out without optimizations, using the current streams tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.0.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464036#comment-16464036 ] ASF GitHub Bot commented on KAFKA-6761: --- guozhangwang closed pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes URL: https://github.com/apache/kafka/pull/4923 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java new file mode 100644 index 000..899ee718e28 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +/** + * Utility base class containing the common fields between + * a Stream-Stream join and a Table-Table join + */ +abstract class BaseJoinProcessorNodeextends StreamsGraphNode { + +private final ProcessorSupplier joinThisProcessSupplier; +private final ProcessorSupplier joinOtherProcessSupplier; +private final ProcessorSupplier joinMergeProcessor; +private final ValueJoiner valueJoiner; +private final String joinThisProcessorName; +private final String joinOtherProcessorName; +private final String joinMergeProcessorName; +private final String thisJoinSide; +private final String otherJoinSide; + + +BaseJoinProcessorNode(final String parentProcessorNodeName, + final String processorNodeName, + final ValueJoiner valueJoiner, + final ProcessorParameters joinThisProcessorDetails, + final ProcessorParameters joinOtherProcessDetails, + final ProcessorParameters joinMergeProcessorDetails, + final String thisJoinSide, + final String otherJoinSide) { + +super(parentProcessorNodeName, + processorNodeName, + false); + +this.valueJoiner = valueJoiner; +this.joinThisProcessSupplier = joinThisProcessorDetails.processorSupplier(); +this.joinOtherProcessSupplier = joinOtherProcessDetails.processorSupplier(); +this.joinMergeProcessor = joinMergeProcessorDetails.processorSupplier(); +this.joinThisProcessorName = joinThisProcessorDetails.processorName(); +this.joinOtherProcessorName = joinOtherProcessDetails.processorName(); +this.joinMergeProcessorName = joinMergeProcessorDetails.processorName(); +this.thisJoinSide = thisJoinSide; +this.otherJoinSide = otherJoinSide; +} + +ProcessorSupplier joinThisProcessorSupplier() { +return joinThisProcessSupplier; +} + +ProcessorSupplier joinOtherProcessorSupplier() { +return joinOtherProcessSupplier; +} + +ProcessorSupplier joinMergeProcessor() { +return joinMergeProcessor; +} + +ValueJoiner valueJoiner() { +return valueJoiner; +} + +String joinThisProcessorName() { +return joinThisProcessorName; +} + +String joinOtherProcessorName() { +return joinOtherProcessorName; +} + +String joinMergeProcessorName() { +return joinMergeProcessorName; +} + +String thisJoinSide() { +return thisJoinSide; +} + +String otherJoinSide() { +return otherJoinSide; +} +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java new file mode
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451258#comment-16451258 ] ASF GitHub Bot commented on KAFKA-6761: --- bbejeck opened a new pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes URL: https://github.com/apache/kafka/pull/4923 This PR supersedes PR #4654 as it was growing too large. All comments in that PR should be addressed here. I will attempt to break the PRs for the topology optimization effort into 3 PRs total and will follow this general plan: 1. This PR only adds the graph nodes and graph. The graph nodes will hold the information used to make calls to the `InternalTopologyBuilder` when using the DSL. Graph nodes are stored in the `StreamsTopologyGraph` until the final topology needs building then the graph is traversed and optimizations are made at that point. There are no tests in this PR relying on the follow-up PR to use all current streams tests, which should suffice. 2. PR 2 will intercept all DSL calls and build the graph. The `InternalStreamsBuilder` uses the graph to provide the required info `InternalTopologyBuilder` and build a topology. The condition of satisfaction for this PR is that all current unit, integration and system tests pass using the graph. 3. PR 3 adds some optimizations mainly automically repartitioning for operations that may modify a key and have child operations that would normally create a separate repartition topic, saving possible unnecessary repartiton topics. For example the following topology: ``` KStreammappedStreamOther = inputStream.map(new KeyValueMapper () { @Override public KeyValue apply(String key, String value) { return KeyValue.pair(key.substring(0, 3), value); } }); mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out"); mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(1)).count().toStream().to("count-two-out"); mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out"); ``` would create 3 repartion topics, but after applying an optimization strategy, only one is created. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.2.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)