This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 8179e8f Improve API docs of (flatT|t)ransform (#6365) 8179e8f is described below commit 8179e8fdc04d8968e2aa11d7959bdbbecea19141 Author: cadonna <cado...@users.noreply.github.com> AuthorDate: Thu Mar 7 23:41:57 2019 +0100 Improve API docs of (flatT|t)ransform (#6365) This commit is a follow-up of pull request #5273 Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/kstream/KStream.java | 61 +++++++++++++++------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 5138917..c61b703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -493,9 +493,9 @@ public interface KStream<K, V> { * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and * returns zero or one output record. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}. - * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}). - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress - * can be observed and additional periodic actions can be performed. + * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper) map()}). + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}, + * the processing progress can be observed and additional periodic actions can be performed. * <p> * In order to assign a state, the state must be created and registered beforehand: * <pre>{@code @@ -509,11 +509,13 @@ public interface KStream<K, V> { * * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState"); * }</pre> - * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}. + * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}. * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, * a schedule must be registered. * The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object) * transform()} and {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}. + * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null}, + * in which case no record is emitted. * <pre>{@code * new TransformerSupplier() { * Transformer get() { @@ -541,19 +543,24 @@ public interface KStream<K, V> { * } * }</pre> * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. + * If repartitioning is required, a call to {@link #through(String) through()} should be performed before + * {@code transform()}. * <p> * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * or join) is applied to the result {@code KStream}. - * (cf. {@link #transformValues(ValueTransformerSupplier, String...)}) + * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} ) * <p> * Note that it is possible to emit multiple records for each input record by using - * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(K, V)}. - * However, a mismatch between the types of the emitted records and the type of the stream would only be detected - * at runtime. - * To ensure type-safety at compile-time, it is recommended to use - * {@link #flatTransform(TransformerSupplier, String...)} if multiple records need to be emitted for each input - * record. + * {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and + * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}. + * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be + * detected at runtime. + * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should + * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and + * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}. + * If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted + * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...) + * flatTransform()}. * * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer} * @param stateStoreNames the names of the state stores used by the processor @@ -575,10 +582,10 @@ public interface KStream<K, V> { * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and * returns zero or more output records. * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}. - * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper)} for stateless record - * transformation). - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. + * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper) flatMap()} for stateless + * record transformation). + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()} + * the processing progress can be observed and additional periodic actions can be performed. * <p> * In order to assign a state, the state must be created and registered beforehand: * <pre>{@code @@ -593,8 +600,12 @@ public interface KStream<K, V> { * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState"); * }</pre> * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. + * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) + * punctuate()}, a schedule must be registered. + * The {@link Transformer} must return an {@link java.lang.Iterable} type (e.g., any {@link java.util.Collection} + * type) in {@link Transformer#transform(Object, Object) transform()}. + * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null}, + * which is equal to returning an empty {@link java.lang.Iterable Iterable}, i.e., no records are emitted. * <pre>{@code * new TransformerSupplier() { * Transformer get() { @@ -626,11 +637,21 @@ public interface KStream<K, V> { * } * }</pre> * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. + * If repartitioning is required, a call to {@link #through(String) through()} should be performed before + * {@code flatTransform()}. * <p> * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * or join) is applied to the result {@code KStream}. - * (cf. {@link #transformValues(ValueTransformerSupplier, String...)}) + * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()}) + * <p> + * Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object) + * context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and + * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}. + * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be + * detected at runtime. + * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should + * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and + * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}. * * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer} * @param stateStoreNames the names of the state stores used by the processor