LQXshane commented on code in PR #14446:
URL: https://github.com/apache/kafka/pull/14446#discussion_r1568199337


##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,47 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning in downstream key changing 
operations.
+     *
+     * <p><em>
+     *     Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+     *     For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+     *     However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+     *     assumes and uses the composite key instead of the original key.
+     * </p></em>
+     *
+     *
+     * This method will overwrite a default behavior as described below.
+     * <p>
+     *     By default, Kafka Streams always automatically repartition the 
records to prepare for a stateful operation,
+     *     however, it is not always required when input stream is partitioned 
as intended. As an example,
+     *     if a input stream is partitioned by a String key1, calling the 
below function will trigger a repartition:
+     *     <pre>{@code
+     *     KStream<String, String> inputStream = builder.stream("topic");
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .groupByKey()
+     *       .aggregate()
+     *     }</pre>
+     * </p>
+     *
+     * <p>
+     *     You can then overwrite the default behavior by calling this method:
+     *     <pre>{@code
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .markAsPartitioned()
+     *       .groupByKey()
+     *       .aggregate()
+     *     }</pre>
+     * </p>
+     *
+     * @return a new {@code KStream} instance that will not repartition in 
subsequent operations: {@link KStream#selectKey(KeyValueMapper)}, {@link 
KStream#map(KeyValueMapper)}, {@link KStream#flatTransform(TransformerSupplier, 
String...)}.

Review Comment:
   Removed for simplicity



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,47 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning in downstream key changing 
operations.
+     *
+     * <p><em>
+     *     Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+     *     For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+     *     However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+     *     assumes and uses the composite key instead of the original key.
+     * </p></em>
+     *
+     *
+     * This method will overwrite a default behavior as described below.
+     * <p>
+     *     By default, Kafka Streams always automatically repartition the 
records to prepare for a stateful operation,
+     *     however, it is not always required when input stream is partitioned 
as intended. As an example,
+     *     if a input stream is partitioned by a String key1, calling the 
below function will trigger a repartition:
+     *     <pre>{@code
+     *     KStream<String, String> inputStream = builder.stream("topic");
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .groupByKey()
+     *       .aggregate()
+     *     }</pre>
+     * </p>
+     *
+     * <p>
+     *     You can then overwrite the default behavior by calling this method:
+     *     <pre>{@code
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .markAsPartitioned()
+     *       .groupByKey()
+     *       .aggregate()
+     *     }</pre>
+     * </p>
+     *
+     * @return a new {@code KStream} instance that will not repartition in 
subsequent operations: {@link KStream#selectKey(KeyValueMapper)}, {@link 
KStream#map(KeyValueMapper)}, {@link KStream#flatTransform(TransformerSupplier, 
String...)}.

Review Comment:
   Removed for simplicity in the new PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to