[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2019-04-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824256#comment-16824256
 ] 

Matthias J. Sax commented on KAFKA-6761:


I am just wondering why this ticket was closed. There are two sub-tasks that 
are not resolved yet.

> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-10-02 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635578#comment-16635578
 ] 

Dong Lin commented on KAFKA-6761:
-

Closing this Jira since all 6 PRs have been merged/closed

> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580587#comment-16580587
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

guozhangwang closed pull request #5451: KAFKA-6761: Reduce streams footprint 
part IV add optimization
URL: https://github.com/apache/kafka/pull/5451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 0442e2bbef2..4c9ee932a1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -530,8 +530,7 @@ public synchronized Topology build() {
  * @return the {@link Topology} that represents the specified processing 
logic
  */
 public synchronized Topology build(final Properties props) {
-// the props instance will be used once optimization framework merged
-internalStreamsBuilder.buildAndOptimizeTopology();
+internalStreamsBuilder.buildAndOptimizeTopology(props);
 return topology;
 }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 889559144d8..e5cd0663472 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
@@ -35,13 +39,18 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 public class InternalStreamsBuilder implements InternalNameProvider {
@@ -49,8 +58,9 @@
 final InternalTopologyBuilder internalTopologyBuilder;
 private final AtomicInteger index = new AtomicInteger(0);
 
-private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
-private final NodeIdComparator nodeIdComparator = new NodeIdComparator();
+private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
+private final Map> 
keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>();
+private final Set mergeNodes = new HashSet<>();
 
 private static final String TOPOLOGY_ROOT = "root";
 private static final Logger LOG = 
LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -205,14 +215,17 @@ public synchronized void addGlobalStore(final 
StoreBuilder storeB
stateUpdateSupplier);
 }
 
-void addGraphNode(final StreamsGraphNode parent, final StreamsGraphNode 
child) {
+void addGraphNode(final StreamsGraphNode parent,
+  final StreamsGraphNode child) {
 Objects.requireNonNull(parent, "parent node can't be null");
 Objects.requireNonNull(child, "child node can't be null");
-parent.addChildNode(child);
+parent.addChild(child);
 maybeAddNodeForOptimizationMetadata(child);
 }
 
-void addGraphNode(final Collection parents, final 
StreamsGraphNode child) {
+
+void addGraphNode(final Collection parents,
+  final StreamsGraphNode child) {
 Objects.requireNonNull(parents, "parent node can't be null");
 Objects.requireNonNull(child, "child node can't be null");
 
@@ -225,13 +238,37 @@ 

[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-08-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573992#comment-16573992
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

guozhangwang closed pull request #5453: MINOR: Follow up for KAFKA-6761 graph 
should add stores for consistency
URL: https://github.com/apache/kafka/pull/5453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index b502153bb52..788e0cb32a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -21,7 +21,9 @@
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
@@ -167,7 +169,7 @@ public String newStoreName(final String prefix) {
 }
 
 public synchronized void addStateStore(final StoreBuilder builder) {
-internalTopologyBuilder.addStateStore(builder);
+addGraphNode(root, new StateStoreNode(builder));
 }
 
 public synchronized void addGlobalStore(final StoreBuilder 
storeBuilder,
@@ -176,16 +178,15 @@ public synchronized void addGlobalStore(final 
StoreBuilder storeB
 final ConsumedInternal consumed,
 final String processorName,
 final ProcessorSupplier 
stateUpdateSupplier) {
-// explicitly disable logging for global stores
-storeBuilder.withLoggingDisabled();
-internalTopologyBuilder.addGlobalStore(storeBuilder,
-   sourceName,
-   consumed.timestampExtractor(),
-   consumed.keyDeserializer(),
-   consumed.valueDeserializer(),
-   topic,
-   processorName,
-   stateUpdateSupplier);
+
+final StreamsGraphNode globalStoreNode = new 
GlobalStoreNode(storeBuilder,
+ 
sourceName,
+ topic,
+ consumed,
+ 
processorName,
+ 
stateUpdateSupplier);
+
+addGraphNode(root, globalStoreNode);
 }
 
 public synchronized void addGlobalStore(final StoreBuilder 
storeBuilder,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
new file mode 100644
index 000..a844de6f839
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import 

[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567498#comment-16567498
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #5453: MINOR: Follow up for KAFKA-6761 graph 
should add stores for consistency
URL: https://github.com/apache/kafka/pull/5453
 
 
   While working on 4th PR, I noticed that I had missed adding stores via the 
graph vs. directly via the `InternalStreamsBuilder`.  Probably ok to do so, but 
we should be consistent.
   
   For testing, I ran the existing tests in streams.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567441#comment-16567441
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #5451: KAFKA-6761: Reduce streams footprint 
part IV add optimization
URL: https://github.com/apache/kafka/pull/5451
 
 
   This PR adds the optimization of eliminating multiple repartition topics 
when the `KStream` resulting from a key-changing operation executes other 
methods using the new key and reduces the repartition topics to one.
   
   Note that this PR leaves in place the optimization for re-using a source 
topic as a changelog topic for source `KTable` instances.  I'll have another 
follow-up PR to move the source topic optimization to a method within 
`InternalStreamsBuilder` so it can be performed in the same area of the code.
   
   Additionally, the current value of `StreamsConfig.OPTIMIZE` is `all` and 
we'll need to have another KIP to change the value to `2.1`.   
   
   An integration test `RepartitionOptimizingIntegrationTest` which asserts the 
same results for an optimized topology with one repartition topic as the 
un-optimized version with four repartition topics.
   More tests will be added, but I wanted to get reviews on the approach now.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-06-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509742#comment-16509742
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #5201: KAFKA-6761: Construct Physical Plan 
using Graph, Reduce streams footprint part III
URL: https://github.com/apache/kafka/pull/5201
 
 
   Sorry for the massive PR, but at this point it's very difficult to break up 
into smaller parts now that we are building the logical and physical plan.
   It's worth noting at them moment this PR does not include optimizations, for 
two reasons 
 1. We need a solution to providing properties for the topology build 
process.
 2. Some of the repartition optimizations need more work  
   
   The specific changes in this PR from the second PR include
   
   1. Moved all graph objects into a separate package.  This requires the graph 
objects to have public access but after looking at the code for some time it's 
worth the trade-off to have a cleaner code-base
   2. Changed the types of graph nodes to names conveying more context
   2. Build the entire physical plan from the graph, after 
`StreamsBuilder.build()` is called.
   3. I'm currently working on applying optimizations and they will be applied 
soon. Those optimizations will include:
1. Re-using source topics as changelogs for KTables
2. Re-using sink topics as changelogs for statestore that output 
directly to sinks
3. Automatically repartition for key-changing operations followed by 
other DSL operations that would repartition on their own, reducing repartitions 
from N to 1.
   
   Other changes are addressed directly as review comments on the PR.
   
   Testing consists of using all existing streams tests to validate building 
the physical plan with graph
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468244#comment-16468244
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #4983: KAFKA-6761 [WIP]- reduce streams 
footprint part II
URL: https://github.com/apache/kafka/pull/4983
 
 
   This version is a WIP and intentionally leaves out some additional required 
changes to keep the reviewing effort more manageable. This version of the 
process includes
   
   1. Cleaning up the graph objects to reduce the number of parameters and make 
the naming conventions more clear.
   2. Intercepting all calls to the `InternalToplogyBuilder` and capturing all 
details required for possible optimizations and building the final topology.
   
   
   This PR does not include writing out the current physical plan, so no tests 
included.  The next PR will include additional changes to building the graph 
and writing the topology out without optimizations, using the current streams 
tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.0.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464036#comment-16464036
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

guozhangwang closed pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes
URL: https://github.com/apache/kafka/pull/4923
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
new file mode 100644
index 000..899ee718e28
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+/**
+ * Utility base class containing the common fields between
+ * a Stream-Stream join and a Table-Table join
+ */
+abstract class BaseJoinProcessorNode extends StreamsGraphNode {
+
+private final ProcessorSupplier joinThisProcessSupplier;
+private final ProcessorSupplier joinOtherProcessSupplier;
+private final ProcessorSupplier joinMergeProcessor;
+private final ValueJoiner 
valueJoiner;
+private final String joinThisProcessorName;
+private final String joinOtherProcessorName;
+private final String joinMergeProcessorName;
+private final String thisJoinSide;
+private final String otherJoinSide;
+
+
+BaseJoinProcessorNode(final String parentProcessorNodeName,
+  final String processorNodeName,
+  final ValueJoiner valueJoiner,
+  final ProcessorParameters 
joinThisProcessorDetails,
+  final ProcessorParameters 
joinOtherProcessDetails,
+  final ProcessorParameters 
joinMergeProcessorDetails,
+  final String thisJoinSide,
+  final String otherJoinSide) {
+
+super(parentProcessorNodeName,
+  processorNodeName,
+  false);
+
+this.valueJoiner = valueJoiner;
+this.joinThisProcessSupplier = 
joinThisProcessorDetails.processorSupplier();
+this.joinOtherProcessSupplier = 
joinOtherProcessDetails.processorSupplier();
+this.joinMergeProcessor = 
joinMergeProcessorDetails.processorSupplier();
+this.joinThisProcessorName = joinThisProcessorDetails.processorName();
+this.joinOtherProcessorName = joinOtherProcessDetails.processorName();
+this.joinMergeProcessorName = 
joinMergeProcessorDetails.processorName();
+this.thisJoinSide = thisJoinSide;
+this.otherJoinSide = otherJoinSide;
+}
+
+ProcessorSupplier joinThisProcessorSupplier() {
+return joinThisProcessSupplier;
+}
+
+ProcessorSupplier joinOtherProcessorSupplier() {
+return joinOtherProcessSupplier;
+}
+
+ProcessorSupplier joinMergeProcessor() {
+return joinMergeProcessor;
+}
+
+ValueJoiner valueJoiner() {
+return valueJoiner;
+}
+
+String joinThisProcessorName() {
+return joinThisProcessorName;
+}
+
+String joinOtherProcessorName() {
+return joinOtherProcessorName;
+}
+
+String joinMergeProcessorName() {
+return joinMergeProcessorName;
+}
+
+String thisJoinSide() {
+return thisJoinSide;
+}
+
+String otherJoinSide() {
+return otherJoinSide;
+}
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
new file mode 

[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451258#comment-16451258
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes
URL: https://github.com/apache/kafka/pull/4923
 
 
   This PR supersedes PR #4654 as it was growing too large. All comments in 
that PR should be addressed here.
I will attempt to break the PRs for the topology optimization effort into 3 
PRs total and will follow this general plan:
   
   1. This PR only adds the graph nodes and graph. The graph nodes will hold 
the information used to make calls to the `InternalTopologyBuilder` when using 
the DSL. Graph nodes are stored in the `StreamsTopologyGraph` until the final 
topology needs building then the graph is traversed and optimizations are made 
at that point.  There are no tests in this PR relying on the follow-up PR to 
use all current streams tests, which should suffice.
   2. PR 2 will intercept all DSL calls and build the graph.  The 
`InternalStreamsBuilder` uses the graph to provide the required info 
`InternalTopologyBuilder` and build a topology.  The condition of satisfaction 
for this PR is that all current unit, integration and system tests pass using 
the graph.
   3. PR 3 adds some optimizations mainly automically repartitioning for 
operations that may modify a key and have child operations that would normally 
create a separate repartition topic, saving possible unnecessary repartiton 
topics.  For example the following topology:
   ```
   KStream mappedStreamOther = inputStream.map(new 
KeyValueMapper() {
   @Override
   public KeyValue apply(String 
key, String value) {
   
   return KeyValue.pair(key.substring(0, 3), value);
   }
   });
   
   
   
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out");
   
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(1)).count().toStream().to("count-two-out");
   
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out");
   ```
   would create 3 repartion topics, but after applying an optimization 
strategy, only one is created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.2.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)