mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1585886754


##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,41 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning by downstream key changing 
operations.

Review Comment:
   ```suggestion
        * and does not require further repartitioning by downstream key 
depedent operations.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -222,21 +226,21 @@ public <KR> KStream<KR, V> selectKey(final 
KeyValueMapper<? super K, ? super V,
                                          final Named named) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(named, "named can't be null");
-
+        final boolean repartitionRequired = !(graphNode instanceof 
PartitionPreservingNode);
         final ProcessorGraphNode<K, V> selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-        selectKeyProcessorNode.keyChangingOperation(true);
+        selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
         builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
         // key serde cannot be preserved
         return new KStreamImpl<>(
-            selectKeyProcessorNode.nodeName(),
-            null,
-            valueSerde,
-            subTopologySourceNodes,
-            true,
-            selectKeyProcessorNode,
-            builder);
+                selectKeyProcessorNode.nodeName(),

Review Comment:
   nit: avoid unnecessary reformatting (ie, indention in this case) -- I assume 
you have some "auto format" feature enabled in your IDE. I would recommend to 
disable it, or adjust the setting to avoid noise like this.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,41 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning by downstream key changing 
operations.
+     * <p>
+     * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+     * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+     * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+     * assumes and uses the composite key instead of the original key.
+     * <p>
+     * This method will overwrite a default behavior as described below.
+     * By default, Kafka Streams always automatically repartition the records 
to prepare for a stateful operation,
+     * however, it is not always required when input stream is partitioned as 
intended. As an example,
+     * if an input stream is partitioned by a String key1, calling the below 
function will trigger a repartition:
+     * <p>
+     * <pre>{@code
+     *     KStream<String, String> inputStream = builder.stream("topic");
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .groupByKey()
+     *       .aggregate()
+     * }</pre>
+     * <p>
+     * You can then overwrite the default behavior by calling this method:
+     * <pre>{@code
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .markAsPartitioned()
+     *       .groupByKey()
+     *       .aggregate()
+     * }</pre>
+     *  <p>

Review Comment:
   Do we need this tag?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,41 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning by downstream key changing 
operations.
+     * <p>
+     * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+     * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+     * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+     * assumes and uses the composite key instead of the original key.

Review Comment:
   Can you refresh my memory about joins? I cannot remember the details.
   
   We should add a section to the `docs/streams/developer-guide/dsl-api.html` 
and explain the "do" and "donts" of this operation.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -222,21 +226,21 @@ public <KR> KStream<KR, V> selectKey(final 
KeyValueMapper<? super K, ? super V,
                                          final Named named) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(named, "named can't be null");
-
+        final boolean repartitionRequired = !(graphNode instanceof 
PartitionPreservingNode);

Review Comment:
   Not sure if I understand this change? `graphNode` is the upstream node to 
the `selectKey()` node. Why would we care if the upstream node was doing 
`markAsRepartitioned()`:
   ```
   stream.map(...).markAsRepartition(...).selectKey(...);
   ```
   
   The `selectKey()` should still set `repartitionedRequired` flag to `true`, 
because it's downstream... And as we cannot look downstream, I believe this 
operator does not need any code change? Same for all other operators that this 
PR modifies atm.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1616,4 +1623,25 @@ public <VOut> KStream<K, VOut> processValues(
             processNode,
             builder);
     }
+
+    @Override
+    public KStream<K, V> markAsPartitioned() {

Review Comment:
   I think we should update the KIP slightly and add an overload 
`markAsPartioned(Named)` variant, similar to other stateless operators like 
`filter()` etc.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1616,4 +1623,25 @@ public <VOut> KStream<K, VOut> processValues(
             processNode,
             builder);
     }
+
+    @Override
+    public KStream<K, V> markAsPartitioned() {
+        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+                new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);

Review Comment:
   If we add `name` this could lead to conflict:
   ```
   stream.markAsRepartition().filter();
   stream.markAsRepartition().map();
   ```
   This should be a valid program, however, both `filter` and `map` would 
generate the same processor name and thus we won't be able to compile it -- if 
we add the `Named` overload, we can use 
`NamedInternal(named).orElseGenerateWithPrefix` to generate a unique name like 
we do for other operator to avoid this issue.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1616,4 +1623,25 @@ public <VOut> KStream<K, VOut> processValues(
             processNode,
             builder);
     }
+
+    @Override
+    public KStream<K, V> markAsPartitioned() {
+        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+                new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);
+
+        final PartitionPreservingNode<? super K, ? super V> 
partitionPreservingNode = new PartitionPreservingNode<>(
+                processorParameters,
+                PARTITION_PRESERVE_NAME + name);
+
+        builder.addGraphNode(graphNode, partitionPreservingNode);
+        return new KStreamImpl<>(
+                partitionPreservingNode.nodeName(),

Review Comment:
   nit: just pass in `name` variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to