Repository: kafka Updated Branches: refs/heads/trunk ca0c071c1 -> a01b72236
MINOR: Update JavaDoc for DSL PAPI-API Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2413 from mjsax/javaDocImprovements6 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a01b7223 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a01b7223 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a01b7223 Branch: refs/heads/trunk Commit: a01b72236926d2bcc52dd7d0378147f8fae8117d Parents: ca0c071 Author: Matthias J. Sax <[email protected]> Authored: Thu Jan 26 21:25:26 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Jan 26 21:25:26 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 20 +++-- .../kafka/streams/kstream/KeyValueMapper.java | 3 +- .../apache/kafka/streams/kstream/Merger.java | 2 +- .../kafka/streams/kstream/Transformer.java | 76 +++++++++++++---- .../streams/kstream/TransformerSupplier.java | 15 +++- .../kafka/streams/kstream/ValueMapper.java | 7 +- .../kafka/streams/kstream/ValueTransformer.java | 86 ++++++++++++++++---- .../kstream/ValueTransformerSupplier.java | 18 +++- 8 files changed, 180 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 3509523..8a18a5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -105,7 +105,8 @@ public interface KStream<K, V> { * (both key and value type can be altered arbitrarily). * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}. - * This is a stateless record-by-record operation. + * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for + * stateful record transformation). * <p> * The example below normalizes the String key to upper-case letters and counts the number of token of the value string. * <pre>{@code @@ -117,7 +118,7 @@ public interface KStream<K, V> { * }); * }</pre> * <p> - * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and the return value must not be {@code null}. + * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}. * <p> * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or * join) is applied to the result {@link KStream}. (cf. {@link #mapValues(ValueMapper)}) @@ -130,6 +131,8 @@ public interface KStream<K, V> { * @see #flatMap(KeyValueMapper) * @see #mapValues(ValueMapper) * @see #flatMapValues(ValueMapper) + * @see #transform(TransformerSupplier, String...) + * @see #transformValues(ValueTransformerSupplier, String...) */ <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); @@ -137,7 +140,8 @@ public interface KStream<K, V> { * Transform the value of each input record into a new value (with possible new type) of the output record. * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. - * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}). + * This is a stateless record-by-record operation (cf. + * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation). * <p> * The example below counts the number of token of the value string. * <pre>{@code @@ -160,6 +164,7 @@ public interface KStream<K, V> { * @see #map(KeyValueMapper) * @see #flatMap(KeyValueMapper) * @see #flatMapValues(ValueMapper) + * @see #transform(TransformerSupplier, String...) * @see #transformValues(ValueTransformerSupplier, String...) */ <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper); @@ -169,7 +174,8 @@ public interface KStream<K, V> { * can be altered arbitrarily). * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records. * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}. - * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)}). + * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for + * stateful record transformation). * <p> * The example below splits input records {@code <null:String>} containing sentences as values into their words * and emit a record {@code <word:1>} for each word. @@ -204,6 +210,7 @@ public interface KStream<K, V> { * @see #mapValues(ValueMapper) * @see #flatMapValues(ValueMapper) * @see #transform(TransformerSupplier, String...) + * @see #transformValues(ValueTransformerSupplier, String...) */ <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); @@ -214,7 +221,8 @@ public interface KStream<K, V> { * stream (value type can be altered arbitrarily). * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values. * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}. - * This is a stateless record-by-record operation. + * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)} + * for stateful value transformation). * <p> * The example below splits input records {@code <null:String>} containing sentences as values into their words. * <pre>{@code @@ -240,6 +248,8 @@ public interface KStream<K, V> { * @see #map(KeyValueMapper) * @see #flatMap(KeyValueMapper) * @see #mapValues(ValueMapper) + * @see #transform(TransformerSupplier, String...) + * @see #transformValues(ValueTransformerSupplier, String...) */ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor); http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java index d6d1def..b677de0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -28,13 +28,14 @@ import org.apache.kafka.common.annotation.InterfaceStability; * <li>map from an input record to a new key (with arbitrary key type as specified by {@code VR})</li> * </ul> * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each - * record of a stream. + * record of a stream (cf. {@link Transformer} for stateful record transformation). * {@link KeyValueMapper} is a generalization of {@link ValueMapper}. * * @param <K> key type * @param <V> value type * @param <VR> mapped value type * @see ValueMapper + * @see Transformer * @see KStream#map(KeyValueMapper) * @see KStream#flatMap(KeyValueMapper) * @see KStream#selectKey(KeyValueMapper) http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java index 5a70f21..f0df2c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; public interface Merger<K, V> { /** - * Compute a new aggregate from the key and two aggregates + * Compute a new aggregate from the key and two aggregates. * * @param aggKey the key of the record * @param aggOne the first aggregate http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index 95d822a..d9a814b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -17,48 +17,96 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TimestampExtractor; /** - * A stateful {@link Transformer} interface to transform a key-value pair into a new value. + * The {@link Transformer} interface for stateful mapping of an input record to zero, one, or multiple new output + * records (both key and value type can be altered arbitrarily). + * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for + * each record of a stream and can access and modify a state that is available beyond a single call of + * {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for stateless record transformation). + * Additionally, the interface can be called in regular intervals based on the processing progress + * (cf. {@link #punctuate(long)}. + * <p> + * Use {@link TransformerSupplier} to provide new instances of {@link Transformer} to Kafka Stream's runtime. + * <p> + * If only a record's value should be modified {@link ValueTransformer} can be used. * - * @param <K> key type - * @param <V> value type - * @param <R> return type + * @param <K> key type + * @param <V> value type + * @param <R> {@link KeyValue} return type (both key and value type can be set + * arbitrarily) + * @see TransformerSupplier + * @see KStream#transform(TransformerSupplier, String...) + * @see ValueTransformer + * @see KStream#map(KeyValueMapper) + * @see KStream#flatMap(KeyValueMapper) */ [email protected] public interface Transformer<K, V, R> { /** - * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology - * that contains it is initialized. + * Initialize this transformer. + * This is called once per instance when the topology gets initialized. * <p> - * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should - * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to + * {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and + * to access attached {@link StateStore}s. + * <p> + * Note, that {@link ProcessorContext} is updated in the background with the current record's meta data. + * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}. * - * @param context the context; may not be null + * @param context the context */ void init(final ProcessorContext context); /** * Transform the record with the given key and value. + * Additionally, any {@link StateStore state} that is {@link KStream#transform(TransformerSupplier, String...) + * attached} to this operator can be accessed and modified + * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}). + * <p> + * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)}, + * {@link ProcessorContext#forward(Object, Object, int)}, and + * {@link ProcessorContext#forward(Object, Object, String)} can be used. + * If not record should be forwarded downstream, {@code transform} can return {@code null}. * * @param key the key for the record * @param value the value for the record - * @return new value; if null no key-value pair will be forwarded to down stream + * @return new {@link KeyValue} pair—if {@code null} no key-value pair will + * be forwarded to down stream */ R transform(final K key, final V value); /** - * Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedules itself} with the context - * during {@link #init(ProcessorContext) initialization}. + * Perform any periodic operations and possibly generate new {@link KeyValue} pairs if this processor + * {@link ProcessorContext#schedule(long) schedules itself} with the context during + * {@link #init(ProcessorContext) initialization}. + * <p> + * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)}, + * {@link ProcessorContext#forward(Object, Object, int)}, and + * {@link ProcessorContext#forward(Object, Object, String)} can be used. + * <p> + * Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to + * timestamps return by the used {@link TimestampExtractor}) + * and not based on wall-clock time. * - * @param timestamp the stream time when this method is being called - * @return new value; if null it will not be forwarded to down stream + * @param timestamp the stream time when {@code punctuate} is being called + * @return must return {@code null}—otherwise, a {@link StreamsException exception} will be thrown */ R punctuate(final long timestamp); /** * Close this processor and clean up any resources. + * <p> + * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)}, + * {@link ProcessorContext#forward(Object, Object, int)}, and + * {@link ProcessorContext#forward(Object, Object, String)} can be used. */ void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java index 0341702..0a01489 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -17,15 +17,28 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.annotation.InterfaceStability; + /** * A {@link TransformerSupplier} interface which can create one or more {@link Transformer} instances. + * + * @param <K> key type + * @param <V> value type + * @param <R> {@link org.apache.kafka.streams.KeyValue KeyValue} return type (both key and value type can be set + * arbitrarily) + * @see Transformer + * @see KStream#transform(TransformerSupplier, String...) + * @see ValueTransformer + * @see ValueTransformerSupplier + * @see KStream#transformValues(ValueTransformerSupplier, String...) */ [email protected] public interface TransformerSupplier<K, V, R> { /** * Return a new {@link Transformer} instance. * - * @return a new {@link Transformer} instance + * @return a new {@link Transformer} instance */ Transformer<K, V, R> get(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java index 5099ac7..71acc1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java @@ -21,14 +21,15 @@ import org.apache.kafka.common.annotation.InterfaceStability; /** * The {@link ValueMapper} interface for mapping a value to a new value of arbitrary type. * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked individually for each record - * of a stream. - * Thus, if {@link ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the - * record's key is preserved. + * of a stream (cf. {@link ValueTransformer} for stateful value transformation). + * If {@link ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the record's + * key is preserved. * If a record's key and value should be modified {@link KeyValueMapper} can be used. * * @param <V> value type * @param <VR> mapped value type * @see KeyValueMapper + * @see ValueTransformer * @see KStream#mapValues(ValueMapper) * @see KStream#flatMapValues(ValueMapper) * @see KTable#mapValues(ValueMapper) http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 063c352..02c3ba7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -17,46 +17,96 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TimestampExtractor; /** - * A stateful {@link ValueTransformer} interface to transform a value into a new value. + * The {@link ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type). + * This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each + * record of a stream and can access and modify a state that is available beyond a single call of + * {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation). + * Additionally, the interface can be called in regular intervals based on the processing progress + * (cf. {@link #punctuate(long)}. + * If {@link ValueTransformer} is applied to a {@link KeyValue} pair record the record's key is preserved. + * <p> + * Use {@link ValueTransformerSupplier} to provide new instances of {@link ValueTransformer} to Kafka Stream's runtime. + * <p> + * If a record's key and value should be modified {@link Transformer} can be used. * - * @param <V> value type - * @param <R> return type + * @param <V> value type + * @param <VR> transformed value type + * @see ValueTransformerSupplier + * @see KStream#transformValues(ValueTransformerSupplier, String...) + * @see Transformer */ -public interface ValueTransformer<V, R> { [email protected] +public interface ValueTransformer<V, VR> { /** - * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology - * that contains it is initialized. + * Initialize this transformer. + * This is called once per instance when the topology gets initialized. * <p> - * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should - * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to + * {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and + * to access attached {@link StateStore}s. + * <p> + * Note that {@link ProcessorContext} is updated in the background with the current record's meta data. + * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}. + * <p> + * Note that using {@link ProcessorContext#forward(Object, Object)}, + * {@link ProcessorContext#forward(Object, Object, int)}, or + * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of + * {@code ValueTransformer} and will result in an {@link StreamsException exception}. * - * @param context the context; may not be null + * @param context the context */ void init(final ProcessorContext context); /** - * Transform the record with the given key and value. + * Transform the given value to a new value. + * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerSupplier, String...) + * attached} to this operator can be accessed and modified arbitrarily (cf. + * {@link ProcessorContext#getStateStore(String)}). + * <p> + * Note, that using {@link ProcessorContext#forward(Object, Object)}, + * {@link ProcessorContext#forward(Object, Object, int)}, and + * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and + * will result in an {@link StreamsException exception}. * - * @param value the value for the record - * @return new value + * @param value the value to be transformed + * @return the new value */ - R transform(final V value); + VR transform(final V value); /** - * Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context - * during {@link #init(ProcessorContext) initialization}. + * Perform any periodic operations if this processor {@link ProcessorContext#schedule(long) schedule itself} with + * the context during {@link #init(ProcessorContext) initialization}. + * <p> + * It is not possible to return any new output records within {@code punctuate}. + * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)}, + * or {@link ProcessorContext#forward(Object, Object, String)} will result in an + * {@link StreamsException exception}. + * Furthermore, {@code punctuate} must return {@code null}. + * <p> + * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to + * timestamps return by the used {@link TimestampExtractor}) + * and not based on wall-clock time. * - * @param timestamp the stream time when this method is being called - * @return new value; if null it will not be forwarded to down stream + * @param timestamp the stream time when {@code punctuate} is being called + * @return must return {@code null}—otherwise, an {@link StreamsException exception} will be thrown */ - R punctuate(final long timestamp); + VR punctuate(final long timestamp); /** * Close this processor and clean up any resources. + * <p> + * It is not possible to return any new output records within {@code close()}. + * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)}, + * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}. */ void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java index ecd454a..3d357ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -14,18 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.annotation.InterfaceStability; + /** * A {@link ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances. + * + * @param <V> value type + * @param <VR> transformed value type + * @see ValueTransformer + * @see KStream#transformValues(ValueTransformerSupplier, String...) + * @see Transformer + * @see TransformerSupplier + * @see KStream#transform(TransformerSupplier, String...) */ -public interface ValueTransformerSupplier<V, R> { [email protected] +public interface ValueTransformerSupplier<V, VR> { /** * Return a new {@link ValueTransformer} instance. * - * @return a new {@link ValueTransformer} instance. + * @return a new {@link ValueTransformer} instance. */ - ValueTransformer<V, R> get(); + ValueTransformer<V, VR> get(); }
