This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 247ccd7 KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (#6265) 247ccd7 is described below commit 247ccd7ac97234d91ba8c6c58a54272e89cd81b4 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Sat Feb 16 21:43:48 2019 -0500 KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (#6265) Reviewers: John Roesler <j...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../internals/GroupedStreamAggregateBuilder.java | 46 +++++--- .../kstream/internals/KGroupedTableImpl.java | 18 ++- .../kstream/RepartitionTopicNamingTest.java | 123 +++++++++++++++++---- 3 files changed, 142 insertions(+), 45 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index bb93a4d..46546f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -20,13 +20,16 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.StoreBuilder; +import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; +import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder; + + import java.util.Collections; import java.util.Set; @@ -36,10 +39,11 @@ class GroupedStreamAggregateBuilder<K, V> { private final Serde<K> keySerde; private final Serde<V> valueSerde; private final boolean repartitionRequired; - private final String userName; + private final String userProvidedRepartitionTopicName; private final Set<String> sourceNodes; private final String name; private final StreamsGraphNode streamsGraphNode; + private StreamsGraphNode repartitionNode; final Initializer<Long> countInitializer = () -> 0L; @@ -61,7 +65,7 @@ class GroupedStreamAggregateBuilder<K, V> { this.sourceNodes = sourceNodes; this.name = name; this.streamsGraphNode = streamsGraphNode; - this.userName = groupedInternal.name(); + this.userProvidedRepartitionTopicName = groupedInternal.name(); } <KR, VR> KTable<KR, VR> build(final String functionName, @@ -74,14 +78,22 @@ class GroupedStreamAggregateBuilder<K, V> { final String aggFunctionName = builder.newProcessorName(functionName); - final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); + String sourceName = this.name; + StreamsGraphNode parentNode = streamsGraphNode; - final String sourceName = repartitionIfRequired(userName != null ? userName : storeBuilder.name(), repartitionNodeBuilder); + if (repartitionRequired) { + final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); + final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeBuilder.name(); + sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder); - StreamsGraphNode parentNode = streamsGraphNode; + // First time through we need to create a repartition node. + // Any subsequent calls to GroupedStreamAggregateBuilder#build we check if + // the user has provided a name for the repartition topic, is so we re-use + // the existing repartition node, otherwise we create a new one. + if (repartitionNode == null || userProvidedRepartitionTopicName == null) { + repartitionNode = repartitionNodeBuilder.build(); + } - if (!sourceName.equals(this.name)) { - final StreamsGraphNode repartitionNode = repartitionNodeBuilder.build(); builder.addGraphNode(parentNode, repartitionNode); parentNode = repartitionNode; } @@ -108,16 +120,16 @@ class GroupedStreamAggregateBuilder<K, V> { } /** - * @return the new sourceName if repartitioned. Otherwise the name of this stream + * @return the new sourceName of the repartitioned source */ - private String repartitionIfRequired(final String repartitionTopicNamePrefix, - final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) { - if (!repartitionRequired) { - return this.name; - } - // if repartition required the operation - // captured needs to be set in the graph - return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, repartitionTopicNamePrefix, optimizableRepartitionNodeBuilder); + private String createRepartitionSource(final String repartitionTopicNamePrefix, + final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) { + + return KStreamImpl.createRepartitionedSource(builder, + keySerde, + valueSerde, + repartitionTopicNamePrefix, + optimizableRepartitionNodeBuilder); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 2eca84e..4675f56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -47,7 +47,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr private static final String REDUCE_NAME = "KTABLE-REDUCE-"; - private final String userSpecifiedName; + private final String userProvidedRepartitionTopicName; private final Initializer<Long> countInitializer = () -> 0L; @@ -55,6 +55,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr private final Aggregator<K, V, Long> countSubtractor = (aggKey, value, aggregate) -> aggregate - 1L; + private StreamsGraphNode repartitionGraphNode; + KGroupedTableImpl(final InternalStreamsBuilder builder, final String name, final Set<String> sourceNodes, @@ -62,22 +64,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr final StreamsGraphNode streamsGraphNode) { super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder); - this.userSpecifiedName = groupedInternal.name(); + this.userProvidedRepartitionTopicName = groupedInternal.name(); } private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) { + final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME); final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); final String funcName = builder.newProcessorName(functionName); - final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName()) + final String repartitionTopic = (userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : materialized.storeName()) + KStreamImpl.REPARTITION_TOPIC_SUFFIX; - final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic); + if (repartitionGraphNode == null || userProvidedRepartitionTopicName == null) { + repartitionGraphNode = createRepartitionNode(sinkName, sourceName, repartitionTopic); + } + // the passed in StreamsGraphNode must be the parent of the repartition node - builder.addGraphNode(this.streamsGraphNode, repartitionNode); + builder.addGraphNode(this.streamsGraphNode, repartitionGraphNode); final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( funcName, @@ -87,7 +93,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr ); // now the repartition node must be the parent of the StateProcessorNode - builder.addGraphNode(repartitionNode, statefulProcessorNode); + builder.addGraphNode(repartitionGraphNode, statefulProcessorNode); // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(funcName, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java index 0b8627f..3c7e8c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java @@ -85,8 +85,13 @@ public class RepartitionTopicNamingTest { public void shouldFailWithSameRepartitionTopicName() { try { final StreamsBuilder builder = new StreamsBuilder(); - builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")).count().toStream(); - builder.<String, String>stream("topicII").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")).count().toStream(); + builder.<String, String>stream("topic").selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")) + .count().toStream(); + + builder.<String, String>stream("topicII").selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")) + .count().toStream(); builder.build(); fail("Should not build re-using repartition topic name"); } catch (final TopologyException te) { @@ -94,28 +99,97 @@ public class RepartitionTopicNamingTest { } } - // each KGroupedStream will result in repartition, can't reuse - // KGroupedStreams when specifying repartition topic names and Optimization is turned off - // need to have separate groupByKey calls when naming repartition topics - // see test shouldHandleUniqueGroupedInstances below for an example @Test - public void shouldFailWithSameRepartitionTopicNameUsingSameKGroupedStream() { - try { - final StreamsBuilder builder = new StreamsBuilder(); - final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); - kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); - kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); - builder.build(); - fail("Should not build re-using repartition topic name"); - } catch (final TopologyException te) { - // ok - } + public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStream() { + final StreamsBuilder builder = new StreamsBuilder(); + final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic") + .selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")); + + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one"); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two"); + + final String topologyString = builder.build().describe().toString(); + assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern))); + assertTrue(topologyString.contains("grouping-repartition")); + } + + @Test + public void shouldNotFailWithSameRepartitionTopicNameUsingSameTimeWindowStream() { + final StreamsBuilder builder = new StreamsBuilder(); + final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic") + .selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")); + + final TimeWindowedKStream<String, String> timeWindowedKStream = kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))); + + timeWindowedKStream.count().toStream().to("output-one"); + timeWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two"); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two"); + + final String topologyString = builder.build().describe().toString(); + assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern))); + assertTrue(topologyString.contains("grouping-repartition")); + } + + @Test + public void shouldNotFailWithSameRepartitionTopicNameUsingSameSessionWindowStream() { + final StreamsBuilder builder = new StreamsBuilder(); + final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic") + .selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")); + + final SessionWindowedKStream<String, String> sessionWindowedKStream = kGroupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(10L))); + + sessionWindowedKStream.count().toStream().to("output-one"); + sessionWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two"); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two"); + + final String topologyString = builder.build().describe().toString(); + assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern))); + assertTrue(topologyString.contains("grouping-repartition")); + } + + @Test + public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedTable() { + final StreamsBuilder builder = new StreamsBuilder(); + final KGroupedTable<String, String> kGroupedTable = builder.<String, String>table("topic") + .groupBy(KeyValue::pair, Grouped.as("grouping")); + kGroupedTable.count().toStream().to("output-count"); + kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> v2).toStream().to("output-reduce"); + final String topologyString = builder.build().describe().toString(); + assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern))); + assertTrue(topologyString.contains("grouping-repartition")); + } + + @Test + public void shouldNotReuseRepartitionNodeWithUnamedRepartitionTopics() { + final StreamsBuilder builder = new StreamsBuilder(); + final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic") + .selectKey((k, v) -> k) + .groupByKey(); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one"); + kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two"); + final String topologyString = builder.build().describe().toString(); + assertThat(2, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern))); + } + + @Test + public void shouldNotReuseRepartitionNodeWithUnamedRepartitionTopicsKGroupedTable() { + final StreamsBuilder builder = new StreamsBuilder(); + final KGroupedTable<String, String> kGroupedTable = builder.<String, String>table("topic").groupBy(KeyValue::pair); + kGroupedTable.count().toStream().to("output-count"); + kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> v2).toStream().to("output-reduce"); + final String topologyString = builder.build().describe().toString(); + assertThat(2, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern))); } @Test public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimizationsOn() { final StreamsBuilder builder = new StreamsBuilder(); - final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); + final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic") + .selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); final Properties properties = new Properties(); @@ -134,8 +208,12 @@ public class RepartitionTopicNamingTest { final KStream<String, String> stream2 = builder.<String, String>stream("topic2").selectKey((k, v) -> k); final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k); - final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition")); - joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition")); + final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, + JoinWindows.of(Duration.ofMillis(30L)), + Joined.named("join-repartition")); + + joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), + Joined.named("join-repartition")); builder.build(); fail("Should not build re-using repartition topic name"); } catch (final TopologyException te) { @@ -143,13 +221,14 @@ public class RepartitionTopicNamingTest { } } - @Test public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); - final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); + final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic") + .selectKey((k, v) -> k) + .groupByKey(Grouped.as("grouping")); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); builder.build(properties);