mjsax commented on code in PR #18800: URL: https://github.com/apache/kafka/pull/18800#discussion_r2162780239
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -100,14 +100,14 @@ public <K, V> KStream<K, V> stream(final Collection<String> topics, final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed); + streamSourceNode.requireRepartitionByKey(); Review Comment: Not sure what this means? Ie, the name and semantics are not clear to me. When we add a new `KStream` from a topic, the assumption is, that the `KStream` is partitioned by key. So this operator does not "require" any repartitioning (it just reads from a topic), and it does also not change the key (so downstream repartitioning is also not required as it's not a key changing operation). ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -27,12 +27,17 @@ import java.util.Optional; public abstract class GraphNode { - + private enum Repartition { + NOT_REQUIRED, + BY_KEY_ONLY, Review Comment: Not sure what this means? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -93,6 +98,14 @@ public String nodeName() { return nodeName; } + public boolean canResolveRepartition() { + return keyChangingOperation || repartition != Repartition.NOT_REQUIRED; Review Comment: The name of this method and it's logic is not clear to me? What does "resolve" actually mean? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -105,6 +118,14 @@ public boolean isMergeNode() { return mergeNode; } + public void requireRepartitionByKey() { + this.repartition = Repartition.BY_KEY_ONLY; + } + + public void requireRepartitionAlways() { + this.repartition = Repartition.ALWAYS_REQUIRED; Review Comment: nit: avoid unnecessary `this.` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -105,6 +118,14 @@ public boolean isMergeNode() { return mergeNode; } + public void requireRepartitionByKey() { + this.repartition = Repartition.BY_KEY_ONLY; Review Comment: nit: avoid unnecessary `this.` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -127,19 +127,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K private static final String REPARTITION_NAME = "KSTREAM-REPARTITION-"; - private final boolean repartitionRequired; - private OptimizableRepartitionNode<K, V> repartitionNode; KStreamImpl(final String name, final Serde<K> keySerde, final Serde<V> valueSerde, final Set<String> subTopologySourceNodes, - final boolean repartitionRequired, final GraphNode graphNode, final InternalStreamsBuilder builder) { super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder); - this.repartitionRequired = repartitionRequired; + } + + private boolean isRepartitionRequired() { + return this.builder.isRepartitionRequired(this.graphNode); Review Comment: nit: avoid unnecessary `this.` prefix ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -27,12 +27,17 @@ import java.util.Optional; public abstract class GraphNode { - + private enum Repartition { + NOT_REQUIRED, Review Comment: Just to make sure I understand correct: Does `NOT_REQUIRED` mean that this node is not a key-dependent operation, and it will never trigger a repartitioning? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java: ########## @@ -45,28 +45,28 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-"; private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; - final boolean repartitionRequired; final String userProvidedRepartitionTopicName; KGroupedStreamImpl(final String name, final Set<String> subTopologySourceNodes, final GroupedInternal<K, V> groupedInternal, - final boolean repartitionRequired, final GraphNode graphNode, final InternalStreamsBuilder builder) { super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), subTopologySourceNodes, graphNode, builder); - this.repartitionRequired = repartitionRequired; this.userProvidedRepartitionTopicName = groupedInternal.name(); this.aggregateBuilder = new GroupedStreamAggregateBuilder<>( builder, groupedInternal, - repartitionRequired, subTopologySourceNodes, name, graphNode ); } + public boolean repartitionRequired() { + return builder.isRepartitionRequired(graphNode); Review Comment: Cf my commend above. This could be `graphNode.repartitionRequired()` is we move the method. Or do I miss something? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -116,14 +116,14 @@ public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) { final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed); + streamPatternSourceNode.requireRepartitionByKey(); Review Comment: Same question as above. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -531,6 +522,7 @@ private KStream<K, V> doRepartition(final Repartitioned<K, V> repartitioned) { ); final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build(); + unoptimizableRepartitionNode.requireRepartitionByKey(); Review Comment: Not sure about the naming and semantics of this method? We have a repartition node at hand, which, well, always repartitions. So it does not care about upstream repartition requirements, and returns a partitioned `KStream` (so downstream, it's not necessary to repartition again. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1307,6 +1295,7 @@ public <KOut, VOut> KStream<KOut, VOut> process( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); + processNode.requireRepartitionAlways(); Review Comment: Why "always"? A `process()` step is key-changing operation, to it repartitioning might be required downstream, but only if a key-dependent operator like groupByKey() or join() follows. So should we just call `processNode.setKeyChangingOperation(true)` instead? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ########## @@ -614,6 +614,14 @@ private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) { return null; } + protected boolean isRepartitionRequired(final GraphNode node) { Review Comment: If we want to know, if `node` requires repartitioning, why do we add this method here? It seems to belong to `GraphNode` class? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -27,12 +27,17 @@ import java.util.Optional; public abstract class GraphNode { - + private enum Repartition { + NOT_REQUIRED, + BY_KEY_ONLY, + ALWAYS_REQUIRED, Review Comment: What does "always" mean? We have key-dependent operators that require repartitioning (if the key was changes upstream) -- but there is no operator that would require auto-repartitioning if they key was not changed? There is of course `repartition()` operator, but it does not require a "auto repartition step" as it does repartition by itself. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -93,6 +98,14 @@ public String nodeName() { return nodeName; } + public boolean canResolveRepartition() { + return keyChangingOperation || repartition != Repartition.NOT_REQUIRED; + } + + public boolean isRepartitionRequired() { + return keyChangingOperation || repartition == Repartition.ALWAYS_REQUIRED; Review Comment: A key-changing operation by itself does not require repartitioning so not clear about the "or" condition. Can you elaborate? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ########## @@ -516,11 +516,12 @@ public KStream<K, V> toStream(final Named named) { name, processorParameters ); + toStreamNode.requireRepartitionByKey(); Review Comment: If we convert a `KTable` into a `KStream`, we know that the `KStream` is partitioned by key (at least for non-windowed tables, this is true, but this some other issue we might not want to address with this PR) -- so there what information are we tracking here? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -93,6 +98,14 @@ public String nodeName() { return nodeName; } + public boolean canResolveRepartition() { + return keyChangingOperation || repartition != Repartition.NOT_REQUIRED; + } + + public boolean isRepartitionRequired() { Review Comment: nit: rename to `repartitioningRequired()` or `requiresRepartitioning()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org