This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9fe89f3  MINOR: improve JavaDocs about auto-repartitioning in Streams 
DSL (#6269)
9fe89f3 is described below

commit 9fe89f357ced1f75d5c7053979e8ec6430041885
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Feb 18 11:22:41 2019 -0800

    MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (#6269)
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 59 +++++++---------------
 1 file changed, 19 insertions(+), 40 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 44778f0..5138917 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -95,7 +95,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * Setting a new key might result in an internal data redistribution if a 
key based operator (like an aggregation or
      * join) is applied to the result {@code KStream}.
      *
@@ -128,7 +127,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and must not return {@code null}.
      * <p>
      * Mapping records might result in an internal data redistribution if a 
key based operator (like an aggregation or
@@ -166,7 +164,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
      * is applied to the result {@code KStream}. (cf. {@link 
#map(KeyValueMapper)})
@@ -201,7 +198,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
      * So, setting a new value preserves data co-location with respect to the 
key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
@@ -246,7 +242,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link KeyValueMapper} must return an {@link Iterable} 
(e.g., any {@link java.util.Collection} type)
      * and the return value must not be {@code null}.
      * <p>
@@ -289,7 +284,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., 
any {@link java.util.Collection} type)
      * and the return value must not be {@code null}.
      * <p>
@@ -336,7 +330,6 @@ public interface KStream<K, V> {
      *     }
      * });
      * }</pre>
-     * <p>
      * The provided {@link ValueMapperWithKey} must return an {@link Iterable} 
(e.g., any {@link java.util.Collection} type)
      * and the return value must not be {@code null}.
      * <p>
@@ -503,7 +496,6 @@ public interface KStream<K, V> {
      * This is a stateful record-by-record operation (cf. {@link 
#map(KeyValueMapper)}).
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing 
progress
      * can be observed and additional periodic actions can be performed.
-     *
      * <p>
      * In order to assign a state, the state must be created and registered 
beforehand:
      * <pre>{@code
@@ -517,7 +509,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transform(new TransformerSupplier() 
{ ... }, "myTransformState");
      * }</pre>
-     * <p>
      * Within the {@link Transformer}, the state is obtained via the {@link  
ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
@@ -549,12 +540,12 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
+     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if 
a key based operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
      * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
-     * </p>
-     *
      * <p>
      * Note that it is possible to emit multiple records for each input record 
by using
      * {@link ProcessorContext#forward(Object, Object) context#forward()} in 
{@link Transformer#transform(K, V)}.
@@ -563,7 +554,6 @@ public interface KStream<K, V> {
      * To ensure type-safety at compile-time, it is recommended to use
      * {@link #flatTransform(TransformerSupplier, String...)} if multiple 
records need to be emitted for each input
      * record.
-     * </p>
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
      * @param stateStoreNames     the names of the state stores used by the 
processor
@@ -589,7 +579,6 @@ public interface KStream<K, V> {
      * transformation).
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress
      * can be observed and additional periodic actions can be performed.
-     *
      * <p>
      * In order to assign a state, the state must be created and registered 
beforehand:
      * <pre>{@code
@@ -603,7 +592,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.flatTransform(new 
TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
-     * <p>
      * Within the {@link Transformer}, the state is obtained via the {@link 
ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
@@ -637,6 +625,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
+     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if 
a key based operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
@@ -677,7 +667,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transformValues(new 
ValueTransformerSupplier() { ... }, "myValueTransformState");
      * }</pre>
-     * <p>
      * Within the {@link ValueTransformer}, the state is obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, a 
schedule must be
@@ -708,6 +697,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
+     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
      * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
@@ -746,7 +737,6 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transformValues(new 
ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
      * }</pre>
-     * <p>
      * Within the {@link ValueTransformerWithKey}, the state is obtained via 
the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
@@ -777,6 +767,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
+     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
      * <p>
      * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
      * So, setting a new value preserves data co-location with respect to the 
key.
@@ -815,7 +807,6 @@ public interface KStream<K, V> {
      *
      * inputStream.process(new ProcessorSupplier() { ... }, 
"myProcessorState");
      * }</pre>
-     * <p>
      * Within the {@link Processor}, the state is obtained via the
      * {@link ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
@@ -842,6 +833,8 @@ public interface KStream<K, V> {
      *     }
      * }
      * }</pre>
+     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
+     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
      *
      * @param processorSupplier a instance of {@link ProcessorSupplier} that 
generates a {@link Processor}
      * @param stateStoreNames   the names of the state store used by the 
processor
@@ -868,7 +861,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * For this case, all data of this stream will be redistributed through 
the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the 
resulting {@link KGroupedStream} is partitioned
@@ -898,7 +890,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * For this case, all data of this stream will be redistributed through 
the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the 
resulting {@link KGroupedStream} is partitioned
@@ -928,10 +919,8 @@ public interface KStream<K, V> {
      * {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, &lt;name&gt; is
      * either provided via {@link 
org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated 
name,
      * and "-repartition" is a fixed suffix.
-     *
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * For this case, all data of this stream will be redistributed through 
the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the 
resulting {@link KGroupedStream} is partitioned
@@ -959,7 +948,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * All data of this stream will be redistributed through the 
repartitioning topic by writing all records to it,
      * and rereading all records from it, such that the resulting {@link 
KGroupedStream} is partitioned on the new key.
@@ -988,7 +976,6 @@ public interface KStream<K, V> {
      * an internally generated name, and "-repartition" is a fixed suffix.
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * All data of this stream will be redistributed through the 
repartitioning topic by writing all records to it,
      * and rereading all records from it, such that the resulting {@link 
KGroupedStream} is partitioned on the new key.
@@ -1020,7 +1007,6 @@ public interface KStream<K, V> {
      * either provided via {@link 
org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated 
name.
      * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * All data of this stream will be redistributed through the 
repartitioning topic by writing all records to it,
      * and rereading all records from it, such that the resulting {@link 
KGroupedStream} is partitioned on the new key.
@@ -1036,8 +1022,6 @@ public interface KStream<K, V> {
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super 
V, KR> selector,
                                        final Grouped<KR, V> grouped);
 
-
-
     /**
      * Join records of this stream with another {@code KStream}'s records 
using windowed inner equi join with default
      * serializers and deserializers.
@@ -1085,7 +1069,6 @@ public interface KStream<K, V> {
      * user-specified in {@link  StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
      * <p>
      * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
      * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
@@ -1175,7 +1158,7 @@ public interface KStream<K, V> {
      * in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"storeName" is an
      * internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1257,7 +1240,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
      * in {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1338,7 +1321,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
      * in {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1422,7 +1405,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
      * in {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1504,7 +1487,7 @@ public interface KStream<K, V> {
      * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
      * in {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
      * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1580,9 +1563,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
      * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
@@ -1656,9 +1638,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
      * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
@@ -1738,9 +1719,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
      * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
@@ -1817,9 +1797,8 @@ public interface KStream<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
-     *
      * <p>
      * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
      * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all

Reply via email to