This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 247ccd7  KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named 
repartition topics (#6265)
247ccd7 is described below

commit 247ccd7ac97234d91ba8c6c58a54272e89cd81b4
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Sat Feb 16 21:43:48 2019 -0500

    KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition 
topics (#6265)
    
    Reviewers: John Roesler <j...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../internals/GroupedStreamAggregateBuilder.java   |  46 +++++---
 .../kstream/internals/KGroupedTableImpl.java       |  18 ++-
 .../kstream/RepartitionTopicNamingTest.java        | 123 +++++++++++++++++----
 3 files changed, 142 insertions(+), 45 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index bb93a4d..46546f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -20,13 +20,16 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
-import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
+import static 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
+import static 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;
+
+
 import java.util.Collections;
 import java.util.Set;
 
@@ -36,10 +39,11 @@ class GroupedStreamAggregateBuilder<K, V> {
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final boolean repartitionRequired;
-    private final String userName;
+    private final String userProvidedRepartitionTopicName;
     private final Set<String> sourceNodes;
     private final String name;
     private final StreamsGraphNode streamsGraphNode;
+    private StreamsGraphNode repartitionNode;
 
     final Initializer<Long> countInitializer = () -> 0L;
 
@@ -61,7 +65,7 @@ class GroupedStreamAggregateBuilder<K, V> {
         this.sourceNodes = sourceNodes;
         this.name = name;
         this.streamsGraphNode = streamsGraphNode;
-        this.userName = groupedInternal.name();
+        this.userProvidedRepartitionTopicName = groupedInternal.name();
     }
 
     <KR, VR> KTable<KR, VR> build(final String functionName,
@@ -74,14 +78,22 @@ class GroupedStreamAggregateBuilder<K, V> {
 
         final String aggFunctionName = builder.newProcessorName(functionName);
 
-        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, 
V> repartitionNodeBuilder = 
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        String sourceName = this.name;
+        StreamsGraphNode parentNode = streamsGraphNode;
 
-        final String sourceName = repartitionIfRequired(userName != null ? 
userName : storeBuilder.name(), repartitionNodeBuilder);
+        if (repartitionRequired) {
+            final OptimizableRepartitionNodeBuilder<K, V> 
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
+            final String repartitionTopicPrefix = 
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : 
storeBuilder.name();
+            sourceName = createRepartitionSource(repartitionTopicPrefix, 
repartitionNodeBuilder);
 
-        StreamsGraphNode parentNode = streamsGraphNode;
+            // First time through we need to create a repartition node.
+            // Any subsequent calls to GroupedStreamAggregateBuilder#build we 
check if
+            // the user has provided a name for the repartition topic, is so 
we re-use
+            // the existing repartition node, otherwise we create a new one.
+            if (repartitionNode == null || userProvidedRepartitionTopicName == 
null) {
+                repartitionNode = repartitionNodeBuilder.build();
+            }
 
-        if (!sourceName.equals(this.name)) {
-            final StreamsGraphNode repartitionNode = 
repartitionNodeBuilder.build();
             builder.addGraphNode(parentNode, repartitionNode);
             parentNode = repartitionNode;
         }
@@ -108,16 +120,16 @@ class GroupedStreamAggregateBuilder<K, V> {
     }
 
     /**
-     * @return the new sourceName if repartitioned. Otherwise the name of this 
stream
+     * @return the new sourceName of the repartitioned source
      */
-    private String repartitionIfRequired(final String 
repartitionTopicNamePrefix,
-                                         final 
OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> 
optimizableRepartitionNodeBuilder) {
-        if (!repartitionRequired) {
-            return this.name;
-        }
-        // if repartition required the operation
-        // captured needs to be set in the graph
-        return KStreamImpl.createRepartitionedSource(builder, keySerde, 
valueSerde, repartitionTopicNamePrefix, optimizableRepartitionNodeBuilder);
+    private String createRepartitionSource(final String 
repartitionTopicNamePrefix,
+                                           final 
OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
+
+        return KStreamImpl.createRepartitionedSource(builder,
+                                                     keySerde,
+                                                     valueSerde,
+                                                     
repartitionTopicNamePrefix,
+                                                     
optimizableRepartitionNodeBuilder);
 
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 2eca84e..4675f56 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -47,7 +47,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    private final String userSpecifiedName;
+    private final String userProvidedRepartitionTopicName;
 
     private final Initializer<Long> countInitializer = () -> 0L;
 
@@ -55,6 +55,8 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
 
     private final Aggregator<K, V, Long> countSubtractor = (aggKey, value, 
aggregate) -> aggregate - 1L;
 
+    private StreamsGraphNode repartitionGraphNode;
+
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
                       final Set<String> sourceNodes,
@@ -62,22 +64,26 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
                       final StreamsGraphNode streamsGraphNode) {
         super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), 
sourceNodes, streamsGraphNode, builder);
 
-        this.userSpecifiedName = groupedInternal.name();
+        this.userProvidedRepartitionTopicName = groupedInternal.name();
     }
 
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> 
aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materialized) {
+
         final String sinkName = 
builder.newProcessorName(KStreamImpl.SINK_NAME);
         final String sourceName = 
builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
-        final String repartitionTopic = (userSpecifiedName != null ? 
userSpecifiedName : materialized.storeName())
+        final String repartitionTopic = (userProvidedRepartitionTopicName != 
null ? userProvidedRepartitionTopicName : materialized.storeName())
             + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-        final StreamsGraphNode repartitionNode = 
createRepartitionNode(sinkName, sourceName, repartitionTopic);
+        if (repartitionGraphNode == null || userProvidedRepartitionTopicName 
== null) {
+            repartitionGraphNode = createRepartitionNode(sinkName, sourceName, 
repartitionTopic);
+        }
+
 
         // the passed in StreamsGraphNode must be the parent of the 
repartition node
-        builder.addGraphNode(this.streamsGraphNode, repartitionNode);
+        builder.addGraphNode(this.streamsGraphNode, repartitionGraphNode);
 
         final StatefulProcessorNode statefulProcessorNode = new 
StatefulProcessorNode<>(
             funcName,
@@ -87,7 +93,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
         );
 
         // now the repartition node must be the parent of the 
StateProcessorNode
-        builder.addGraphNode(repartitionNode, statefulProcessorNode);
+        builder.addGraphNode(repartitionGraphNode, statefulProcessorNode);
 
         // return the KTable representation with the intermediate topic as the 
sources
         return new KTableImpl<>(funcName,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 0b8627f..3c7e8c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -85,8 +85,13 @@ public class RepartitionTopicNamingTest {
     public void shouldFailWithSameRepartitionTopicName() {
         try {
             final StreamsBuilder builder = new StreamsBuilder();
-            builder.<String, String>stream("topic").selectKey((k, v) -> 
k).groupByKey(Grouped.as("grouping")).count().toStream();
-            builder.<String, String>stream("topicII").selectKey((k, v) -> 
k).groupByKey(Grouped.as("grouping")).count().toStream();
+            builder.<String, String>stream("topic").selectKey((k, v) -> k)
+                                            .groupByKey(Grouped.as("grouping"))
+                                            .count().toStream();
+
+            builder.<String, String>stream("topicII").selectKey((k, v) -> k)
+                                              
.groupByKey(Grouped.as("grouping"))
+                                              .count().toStream();
             builder.build();
             fail("Should not build re-using repartition topic name");
         } catch (final TopologyException te) {
@@ -94,28 +99,97 @@ public class RepartitionTopicNamingTest {
         }
     }
 
-    // each KGroupedStream will result in repartition, can't reuse
-    // KGroupedStreams when specifying repartition topic names and 
Optimization is turned off
-    // need to have separate groupByKey calls when naming repartition topics
-    // see test shouldHandleUniqueGroupedInstances below for an example
     @Test
-    public void 
shouldFailWithSameRepartitionTopicNameUsingSameKGroupedStream() {
-        try {
-            final StreamsBuilder builder = new StreamsBuilder();
-            final KGroupedStream<String, String> kGroupedStream = 
builder.<String, String>stream("topic").selectKey((k, v) -> 
k).groupByKey(Grouped.as("grouping"));
-            
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
-            
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
-            builder.build();
-            fail("Should not build re-using repartition topic name");
-        } catch (final TopologyException te) {
-            // ok
-        }
+    public void 
shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStream() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic")
+                                                                     
.selectKey((k, v) -> k)
+                                                                     
.groupByKey(Grouped.as("grouping"));
+
+        
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
+        
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+
+        final String topologyString = builder.build().describe().toString();
+        assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, 
repartitionTopicPattern)));
+        assertTrue(topologyString.contains("grouping-repartition"));
+    }
+
+    @Test
+    public void 
shouldNotFailWithSameRepartitionTopicNameUsingSameTimeWindowStream() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic")
+                                                                     
.selectKey((k, v) -> k)
+                                                                     
.groupByKey(Grouped.as("grouping"));
+
+        final TimeWindowedKStream<String, String> timeWindowedKStream = 
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L)));
+
+        timeWindowedKStream.count().toStream().to("output-one");
+        timeWindowedKStream.reduce((v, v2) -> v + 
v2).toStream().to("output-two");
+        
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+
+        final String topologyString = builder.build().describe().toString();
+        assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, 
repartitionTopicPattern)));
+        assertTrue(topologyString.contains("grouping-repartition"));
+    }
+
+    @Test
+    public void 
shouldNotFailWithSameRepartitionTopicNameUsingSameSessionWindowStream() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic")
+                                                                     
.selectKey((k, v) -> k)
+                                                                     
.groupByKey(Grouped.as("grouping"));
+
+        final SessionWindowedKStream<String, String> sessionWindowedKStream = 
kGroupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(10L)));
+
+        sessionWindowedKStream.count().toStream().to("output-one");
+        sessionWindowedKStream.reduce((v, v2) -> v + 
v2).toStream().to("output-two");
+        
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+
+        final String topologyString = builder.build().describe().toString();
+        assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, 
repartitionTopicPattern)));
+        assertTrue(topologyString.contains("grouping-repartition"));
+    }
+
+    @Test
+    public void 
shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedTable() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedTable<String, String> kGroupedTable = builder.<String, 
String>table("topic")
+                                                                   
.groupBy(KeyValue::pair, Grouped.as("grouping"));
+        kGroupedTable.count().toStream().to("output-count");
+        kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> 
v2).toStream().to("output-reduce");
+        final String topologyString = builder.build().describe().toString();
+        assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, 
repartitionTopicPattern)));
+        assertTrue(topologyString.contains("grouping-repartition"));
+    }
+
+    @Test
+    public void shouldNotReuseRepartitionNodeWithUnamedRepartitionTopics() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic")
+                                                                     
.selectKey((k, v) -> k)
+                                                                     
.groupByKey();
+        
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
+        
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+        final String topologyString = builder.build().describe().toString();
+        assertThat(2, is(getCountOfRepartitionTopicsFound(topologyString, 
repartitionTopicPattern)));
+    }
+
+    @Test
+    public void 
shouldNotReuseRepartitionNodeWithUnamedRepartitionTopicsKGroupedTable() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedTable<String, String> kGroupedTable = builder.<String, 
String>table("topic").groupBy(KeyValue::pair);
+        kGroupedTable.count().toStream().to("output-count");
+        kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> 
v2).toStream().to("output-reduce");
+        final String topologyString = builder.build().describe().toString();
+        assertThat(2, is(getCountOfRepartitionTopicsFound(topologyString, 
repartitionTopicPattern)));
     }
 
     @Test
     public void 
shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimizationsOn()
 {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic").selectKey((k, v) -> 
k).groupByKey(Grouped.as("grouping"));
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic")
+                                                                     
.selectKey((k, v) -> k)
+                                                                     
.groupByKey(Grouped.as("grouping"));
         
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
         
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
         final Properties properties = new Properties();
@@ -134,8 +208,12 @@ public class RepartitionTopicNamingTest {
             final KStream<String, String> stream2 = builder.<String, 
String>stream("topic2").selectKey((k, v) -> k);
             final KStream<String, String> stream3 = builder.<String, 
String>stream("topic3").selectKey((k, v) -> k);
 
-            final KStream<String, String> joined = stream1.join(stream2, (v1, 
v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), 
Joined.named("join-repartition"));
-            joined.join(stream3, (v1, v2) -> v1 + v2, 
JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition"));
+            final KStream<String, String> joined = stream1.join(stream2, (v1, 
v2) -> v1 + v2,
+                                                                
JoinWindows.of(Duration.ofMillis(30L)),
+                                                                
Joined.named("join-repartition"));
+
+            joined.join(stream3, (v1, v2) -> v1 + v2, 
JoinWindows.of(Duration.ofMillis(30L)),
+                                                      
Joined.named("join-repartition"));
             builder.build();
             fail("Should not build re-using repartition topic name");
         } catch (final TopologyException te) {
@@ -143,13 +221,14 @@ public class RepartitionTopicNamingTest {
         }
     }
 
-
     @Test
     public void 
shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties properties = new Properties();
         properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
-        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic").selectKey((k, v) -> 
k).groupByKey(Grouped.as("grouping"));
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, 
String>stream("topic")
+                                                                     
.selectKey((k, v) -> k)
+                                                                     
.groupByKey(Grouped.as("grouping"));
         
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
         
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
         builder.build(properties);

Reply via email to