This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9fe89f3 MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (#6269) 9fe89f3 is described below commit 9fe89f357ced1f75d5c7053979e8ec6430041885 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Mon Feb 18 11:22:41 2019 -0800 MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (#6269) --- .../org/apache/kafka/streams/kstream/KStream.java | 59 +++++++--------------- 1 file changed, 19 insertions(+), 40 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 44778f0..5138917 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -95,7 +95,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or * join) is applied to the result {@code KStream}. * @@ -128,7 +127,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}. * <p> * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or @@ -166,7 +164,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * Setting a new value preserves data co-location with respect to the key. * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join) * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)}) @@ -201,7 +198,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * So, setting a new value preserves data co-location with respect to the key. * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join) @@ -246,7 +242,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) * and the return value must not be {@code null}. * <p> @@ -289,7 +284,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) * and the return value must not be {@code null}. * <p> @@ -336,7 +330,6 @@ public interface KStream<K, V> { * } * }); * }</pre> - * <p> * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) * and the return value must not be {@code null}. * <p> @@ -503,7 +496,6 @@ public interface KStream<K, V> { * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}). * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress * can be observed and additional periodic actions can be performed. - * * <p> * In order to assign a state, the state must be created and registered beforehand: * <pre>{@code @@ -517,7 +509,6 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState"); * }</pre> - * <p> * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, * a schedule must be registered. @@ -549,12 +540,12 @@ public interface KStream<K, V> { * } * } * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * <p> * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * or join) is applied to the result {@code KStream}. * (cf. {@link #transformValues(ValueTransformerSupplier, String...)}) - * </p> - * * <p> * Note that it is possible to emit multiple records for each input record by using * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(K, V)}. @@ -563,7 +554,6 @@ public interface KStream<K, V> { * To ensure type-safety at compile-time, it is recommended to use * {@link #flatTransform(TransformerSupplier, String...)} if multiple records need to be emitted for each input * record. - * </p> * * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer} * @param stateStoreNames the names of the state stores used by the processor @@ -589,7 +579,6 @@ public interface KStream<K, V> { * transformation). * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress * can be observed and additional periodic actions can be performed. - * * <p> * In order to assign a state, the state must be created and registered beforehand: * <pre>{@code @@ -603,7 +592,6 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState"); * }</pre> - * <p> * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, * a schedule must be registered. @@ -637,6 +625,8 @@ public interface KStream<K, V> { * } * } * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * <p> * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * or join) is applied to the result {@code KStream}. @@ -677,7 +667,6 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState"); * }</pre> - * <p> * Within the {@link ValueTransformer}, the state is obtained via the * {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, a schedule must be @@ -708,6 +697,8 @@ public interface KStream<K, V> { * } * } * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * <p> * Setting a new value preserves data co-location with respect to the key. * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join) @@ -746,7 +737,6 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState"); * }</pre> - * <p> * Within the {@link ValueTransformerWithKey}, the state is obtained via the * {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, @@ -777,6 +767,8 @@ public interface KStream<K, V> { * } * } * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * <p> * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * So, setting a new value preserves data co-location with respect to the key. @@ -815,7 +807,6 @@ public interface KStream<K, V> { * * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState"); * }</pre> - * <p> * Within the {@link Processor}, the state is obtained via the * {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, @@ -842,6 +833,8 @@ public interface KStream<K, V> { * } * } * }</pre> + * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor} * @param stateStoreNames the names of the state store used by the processor @@ -868,7 +861,6 @@ public interface KStream<K, V> { * an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned @@ -898,7 +890,6 @@ public interface KStream<K, V> { * an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned @@ -928,10 +919,8 @@ public interface KStream<K, V> { * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, <name> is * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name, * and "-repartition" is a fixed suffix. - * * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned @@ -959,7 +948,6 @@ public interface KStream<K, V> { * an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * All data of this stream will be redistributed through the repartitioning topic by writing all records to it, * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. @@ -988,7 +976,6 @@ public interface KStream<K, V> { * an internally generated name, and "-repartition" is a fixed suffix. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * All data of this stream will be redistributed through the repartitioning topic by writing all records to it, * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. @@ -1020,7 +1007,6 @@ public interface KStream<K, V> { * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * All data of this stream will be redistributed through the repartitioning topic by writing all records to it, * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. @@ -1036,8 +1022,6 @@ public interface KStream<K, V> { <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, final Grouped<KR, V> grouped); - - /** * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default * serializers and deserializers. @@ -1085,7 +1069,6 @@ public interface KStream<K, V> { * user-specified in {@link StreamsConfig} via parameter * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and * "-repartition" is a fixed suffix. - * * <p> * Repartitioning can happen for one or both of the joining {@code KStream}s. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1175,7 +1158,7 @@ public interface KStream<K, V> { * in {@link StreamsConfig} via parameter * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an * internally generated name, and "-changelog" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param otherStream the {@code KStream} to be joined with this stream @@ -1257,7 +1240,7 @@ public interface KStream<K, V> { * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param otherStream the {@code KStream} to be joined with this stream @@ -1338,7 +1321,7 @@ public interface KStream<K, V> { * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param otherStream the {@code KStream} to be joined with this stream @@ -1422,7 +1405,7 @@ public interface KStream<K, V> { * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param otherStream the {@code KStream} to be joined with this stream @@ -1504,7 +1487,7 @@ public interface KStream<K, V> { * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param otherStream the {@code KStream} to be joined with this stream @@ -1580,9 +1563,8 @@ public interface KStream<K, V> { * user-specified in {@link StreamsConfig} via parameter * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and * "-repartition" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1656,9 +1638,8 @@ public interface KStream<K, V> { * user-specified in {@link StreamsConfig} via parameter * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and * "-repartition" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1738,9 +1719,8 @@ public interface KStream<K, V> { * user-specified in {@link StreamsConfig} via parameter * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and * "-repartition" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all @@ -1817,9 +1797,8 @@ public interface KStream<K, V> { * user-specified in {@link StreamsConfig} via parameter * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated name, and * "-repartition" is a fixed suffix. - * + * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. - * * <p> * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all