This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96c4e32  Improve API docs of (flatT|t)ransform (#6365)
96c4e32 is described below

commit 96c4e323bfc886b654b198896e78a7d21c80e30a
Author: cadonna <cado...@users.noreply.github.com>
AuthorDate: Thu Mar 7 23:41:57 2019 +0100

    Improve API docs of (flatT|t)ransform (#6365)
    
    This commit is a follow-up of pull request #5273
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck 
<bbej...@gmail.com>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 61 +++++++++++++++-------
 1 file changed, 41 insertions(+), 20 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index df001d0..d264390 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -494,9 +494,9 @@ public interface KStream<K, V> {
      * A {@link Transformer} (provided by the given {@link 
TransformerSupplier}) is applied to each input record and
      * returns zero or one output record.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
-     * This is a stateful record-by-record operation (cf. {@link 
#map(KeyValueMapper)}).
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing 
progress
-     * can be observed and additional periodic actions can be performed.
+     * This is a stateful record-by-record operation (cf. {@link 
#map(KeyValueMapper) map()}).
+     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()},
+     * the processing progress can be observed and additional periodic actions 
can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
      * global state stores; read-only access to global state stores is 
available by default):
@@ -511,11 +511,13 @@ public interface KStream<K, V> {
      *
      * KStream outputStream = inputStream.transform(new TransformerSupplier() 
{ ... }, "myTransformState");
      * }</pre>
-     * Within the {@link Transformer}, the state is obtained via the {@link  
ProcessorContext}.
+     * Within the {@link Transformer}, the state is obtained via the {@link 
ProcessorContext}.
      * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
      * a schedule must be registered.
      * The {@link Transformer} must return a {@link KeyValue} type in {@link 
Transformer#transform(Object, Object)
      * transform()} and {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}.
+     * The return value of {@link Transformer#transform(Object, Object) 
Transformer#transform()} may be {@code null},
+     * in which case no record is emitted.
      * <pre>{@code
      * new TransformerSupplier() {
      *     Transformer get() {
@@ -543,19 +545,24 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
+     * If repartitioning is required, a call to {@link #through(String) 
through()} should be performed before
+     * {@code transform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if 
a key based operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
-     * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) 
transformValues()} )
      * <p>
      * Note that it is possible to emit multiple records for each input record 
by using
-     * {@link ProcessorContext#forward(Object, Object) context#forward()} in 
{@link Transformer#transform(K, V)}.
-     * However, a mismatch between the types of the emitted records and the 
type of the stream would only be detected
-     * at runtime.
-     * To ensure type-safety at compile-time, it is recommended to use
-     * {@link #flatTransform(TransformerSupplier, String...)} if multiple 
records need to be emitted for each input
-     * record.
+     * {@link ProcessorContext#forward(Object, Object) context#forward()} in 
{@link Transformer#transform(Object, Object) Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
+     * Be aware that a mismatch between the types of the emitted records and 
the type of the stream would only be
+     * detected at runtime.
+     * To ensure type-safety at compile-time, {@link 
ProcessorContext#forward(Object, Object) context#forward()} should
+     * not be used in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
+     * If in {@link Transformer#transform(Object, Object) 
Transformer#transform()} multiple records need to be emitted
+     * for each input record, it is recommended to use {@link 
#flatTransform(TransformerSupplier, String...)
+     * flatTransform()}.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
      * @param stateStoreNames     the names of the state stores used by the 
processor
@@ -577,10 +584,10 @@ public interface KStream<K, V> {
      * A {@link Transformer} (provided by the given {@link 
TransformerSupplier}) is applied to each input record and
      * returns zero or more output records.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K':V'>, <K'':V''>, ...}.
-     * This is a stateful record-by-record operation (cf. {@link 
#flatMap(KeyValueMapper)} for stateless record
-     * transformation).
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress
-     * can be observed and additional periodic actions can be performed.
+     * This is a stateful record-by-record operation (cf. {@link 
#flatMap(KeyValueMapper) flatMap()} for stateless
+     * record transformation).
+     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}
+     * the processing progress can be observed and additional periodic actions 
can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered 
beforehand (it's not required to connect
      * global state stores; read-only access to global state stores is 
available by default):
@@ -596,8 +603,12 @@ public interface KStream<K, V> {
      * KStream outputStream = inputStream.flatTransform(new 
TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
      * Within the {@link Transformer}, the state is obtained via the {@link 
ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
+     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)
+     * punctuate()}, a schedule must be registered.
+     * The {@link Transformer} must return an {@link java.lang.Iterable} type 
(e.g., any {@link java.util.Collection}
+     * type) in {@link Transformer#transform(Object, Object) transform()}.
+     * The return value of {@link Transformer#transform(Object, Object) 
Transformer#transform()} may be {@code null},
+     * which is equal to returning an empty {@link java.lang.Iterable 
Iterable}, i.e., no records are emitted.
      * <pre>{@code
      * new TransformerSupplier() {
      *     Transformer get() {
@@ -629,11 +640,21 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #through(String)} 
should be performed before {@code transform()}.
+     * If repartitioning is required, a call to {@link #through(String) 
through()} should be performed before
+     * {@code flatTransform()}.
      * <p>
      * Transforming records might result in an internal data redistribution if 
a key based operator (like an aggregation
      * or join) is applied to the result {@code KStream}.
-     * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) 
transformValues()})
+     * <p>
+     * Note that it is possible to emit records by using {@link 
ProcessorContext#forward(Object, Object)
+     * context#forward()} in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
+     * Be aware that a mismatch between the types of the emitted records and 
the type of the stream would only be
+     * detected at runtime.
+     * To ensure type-safety at compile-time, {@link 
ProcessorContext#forward(Object, Object) context#forward()} should
+     * not be used in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
+     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
      *
      * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
      * @param stateStoreNames     the names of the state stores used by the 
processor

Reply via email to