Repository: kafka
Updated Branches:
  refs/heads/trunk ca0c071c1 -> a01b72236


MINOR: Update JavaDoc for DSL PAPI-API

Author: Matthias J. Sax <[email protected]>

Reviewers: Damian Guy, Guozhang Wang

Closes #2413 from mjsax/javaDocImprovements6


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a01b7223
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a01b7223
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a01b7223

Branch: refs/heads/trunk
Commit: a01b72236926d2bcc52dd7d0378147f8fae8117d
Parents: ca0c071
Author: Matthias J. Sax <[email protected]>
Authored: Thu Jan 26 21:25:26 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Thu Jan 26 21:25:26 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 20 +++--
 .../kafka/streams/kstream/KeyValueMapper.java   |  3 +-
 .../apache/kafka/streams/kstream/Merger.java    |  2 +-
 .../kafka/streams/kstream/Transformer.java      | 76 +++++++++++++----
 .../streams/kstream/TransformerSupplier.java    | 15 +++-
 .../kafka/streams/kstream/ValueMapper.java      |  7 +-
 .../kafka/streams/kstream/ValueTransformer.java | 86 ++++++++++++++++----
 .../kstream/ValueTransformerSupplier.java       | 18 +++-
 8 files changed, 180 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 3509523..8a18a5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -105,7 +105,8 @@ public interface KStream<K, V> {
      * (both key and value type can be altered arbitrarily).
      * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new output record.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
-     * This is a stateless record-by-record operation.
+     * This is a stateless record-by-record operation (cf. {@link 
#transform(TransformerSupplier, String...)} for
+     * stateful record transformation).
      * <p>
      * The example below normalizes the String key to upper-case letters and 
counts the number of token of the value string.
      * <pre>{@code
@@ -117,7 +118,7 @@ public interface KStream<K, V> {
      * });
      * }</pre>
      * <p>
-     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and the return value must not be {@code null}.
+     * The provided {@link KeyValueMapper} must return a {@link KeyValue} type 
and must not return {@code null}.
      * <p>
      * Mapping records might result in an internal data redistribution if a 
key based operator (like an aggregation or
      * join) is applied to the result {@link KStream}. (cf. {@link 
#mapValues(ValueMapper)})
@@ -130,6 +131,8 @@ public interface KStream<K, V> {
      * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
      * @see #flatMapValues(ValueMapper)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
      */
     <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper);
 
@@ -137,7 +140,8 @@ public interface KStream<K, V> {
      * Transform the value of each input record into a new value (with 
possible new type) of the output record.
      * The provided {@link ValueMapper} is applied to each input record value 
and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transformValues(ValueTransformerSupplier, String...)}).
+     * This is a stateless record-by-record operation (cf.
+     * {@link #transformValues(ValueTransformerSupplier, String...)} for 
stateful value transformation).
      * <p>
      * The example below counts the number of token of the value string.
      * <pre>{@code
@@ -160,6 +164,7 @@ public interface KStream<K, V> {
      * @see #map(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #flatMapValues(ValueMapper)
+     * @see #transform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      */
     <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper);
@@ -169,7 +174,8 @@ public interface KStream<K, V> {
      * can be altered arbitrarily).
      * The provided {@link KeyValueMapper} is applied to each input record and 
computes zero or more output records.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K':V'>, <K'':V''>, ...}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transform(TransformerSupplier, String...)}).
+     * This is a stateless record-by-record operation (cf. {@link 
#transform(TransformerSupplier, String...)} for
+     * stateful record transformation).
      * <p>
      * The example below splits input records {@code <null:String>} containing 
sentences as values into their words
      * and emit a record {@code <word:1>} for each word.
@@ -204,6 +210,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #flatMapValues(ValueMapper)
      * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
      */
     <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super 
V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
@@ -214,7 +221,8 @@ public interface KStream<K, V> {
      * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapper} is applied to each input record and 
computes zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K:V'>, <K:V''>, ...}.
-     * This is a stateless record-by-record operation.
+     * This is a stateless record-by-record operation (cf. {@link 
#transformValues(ValueTransformerSupplier, String...)}
+     * for stateful value transformation).
      * <p>
      * The example below splits input records {@code <null:String>} containing 
sentences as values into their words.
      * <pre>{@code
@@ -240,6 +248,8 @@ public interface KStream<K, V> {
      * @see #map(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
      */
     <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends 
Iterable<? extends VR>> processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index d6d1def..b677de0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -28,13 +28,14 @@ import 
org.apache.kafka.common.annotation.InterfaceStability;
  * <li>map from an input record to a new key (with arbitrary key type as 
specified by {@code VR})</li>
  * </ul>
  * This is a stateless record-by-record operation, i.e, {@link #apply(Object, 
Object)} is invoked individually for each
- * record of a stream.
+ * record of a stream (cf. {@link Transformer} for stateful record 
transformation).
  * {@link KeyValueMapper} is a generalization of {@link ValueMapper}.
  *
  * @param <K>  key type
  * @param <V>  value type
  * @param <VR> mapped value type
  * @see ValueMapper
+ * @see Transformer
  * @see KStream#map(KeyValueMapper)
  * @see KStream#flatMap(KeyValueMapper)
  * @see KStream#selectKey(KeyValueMapper)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
index 5a70f21..f0df2c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 public interface Merger<K, V> {
 
     /**
-     * Compute a new aggregate from the key and two aggregates
+     * Compute a new aggregate from the key and two aggregates.
      *
      * @param aggKey    the key of the record
      * @param aggOne    the first aggregate

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 95d822a..d9a814b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -17,48 +17,96 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TimestampExtractor;
 
 /**
- * A stateful {@link Transformer} interface to transform a key-value pair into 
a new value.
+ * The {@link Transformer} interface for stateful mapping of an input record 
to zero, one, or multiple new output
+ * records (both key and value type can be altered arbitrarily).
+ * This is a stateful record-by-record operation, i.e, {@link 
#transform(Object, Object)} is invoked individually for
+ * each record of a stream and can access and modify a state that is available 
beyond a single call of
+ * {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for 
stateless record transformation).
+ * Additionally, the interface can be called in regular intervals based on the 
processing progress
+ * (cf. {@link #punctuate(long)}.
+ * <p>
+ * Use {@link TransformerSupplier} to provide new instances of {@link 
Transformer} to Kafka Stream's runtime.
+ * <p>
+ * If only a record's value should be modified {@link ValueTransformer} can be 
used.
  *
- * @param <K>   key type
- * @param <V>   value type
- * @param <R>   return type
+ * @param <K> key type
+ * @param <V> value type
+ * @param <R> {@link KeyValue} return type (both key and value type can be set
+ *            arbitrarily)
+ * @see TransformerSupplier
+ * @see KStream#transform(TransformerSupplier, String...)
+ * @see ValueTransformer
+ * @see KStream#map(KeyValueMapper)
+ * @see KStream#flatMap(KeyValueMapper)
  */
[email protected]
 public interface Transformer<K, V, R> {
 
     /**
-     * Initialize this transformer with the given context. The framework 
ensures this is called once per processor when the topology
-     * that contains it is initialized.
+     * Initialize this transformer.
+     * This is called once per instance when the topology gets initialized.
      * <p>
-     * If this transformer is to be {@link #punctuate(long) called 
periodically} by the framework, then this method should
-     * {@link ProcessorContext#schedule(long) schedule itself} with the 
provided context.
+     * The provided {@link ProcessorContext context} can be used to access 
topology and record meta data, to
+     * {@link ProcessorContext#schedule(long) schedule itself} for periodical 
calls (cf. {@link #punctuate(long)}), and
+     * to access attached {@link StateStore}s.
+     * <p>
+     * Note, that {@link ProcessorContext} is updated in the background with 
the current record's meta data.
+     * Thus, it only contains valid record meta data when accessed within 
{@link #transform(Object, Object)}.
      *
-     * @param context the context; may not be null
+     * @param context the context
      */
     void init(final ProcessorContext context);
 
     /**
      * Transform the record with the given key and value.
+     * Additionally, any {@link StateStore state} that is {@link 
KStream#transform(TransformerSupplier, String...)
+     * attached} to this operator can be accessed and modified
+     * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
+     * <p>
+     * If more than one output record should be forwarded downstream {@link 
ProcessorContext#forward(Object, Object)},
+     * {@link ProcessorContext#forward(Object, Object, int)}, and
+     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * If not record should be forwarded downstream, {@code transform} can 
return {@code null}.
      *
      * @param key the key for the record
      * @param value the value for the record
-     * @return new value; if null no key-value pair will be forwarded to down 
stream
+     * @return new {@link KeyValue} pair&mdash;if {@code null} no key-value 
pair will
+     * be forwarded to down stream
      */
     R transform(final K key, final V value);
 
     /**
-     * Perform any periodic operations and possibly generate a key, if this 
processor {@link ProcessorContext#schedule(long) schedules itself} with the 
context
-     * during {@link #init(ProcessorContext) initialization}.
+     * Perform any periodic operations and possibly generate new {@link 
KeyValue} pairs if this processor
+     * {@link ProcessorContext#schedule(long) schedules itself} with the 
context during
+     * {@link #init(ProcessorContext) initialization}.
+     * <p>
+     * To generate new {@link KeyValue} pairs {@link 
ProcessorContext#forward(Object, Object)},
+     * {@link ProcessorContext#forward(Object, Object, int)}, and
+     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * <p>
+     * Note that {@code punctuate} is called based on <it>stream time</it> 
(i.e., time progresses with regard to
+     * timestamps return by the used {@link TimestampExtractor})
+     * and not based on wall-clock time.
      *
-     * @param timestamp the stream time when this method is being called
-     * @return new value; if null it will not be forwarded to down stream
+     * @param timestamp the stream time when {@code punctuate} is being called
+     * @return must return {@code null}&mdash;otherwise, a {@link 
StreamsException exception} will be thrown
      */
     R punctuate(final long timestamp);
 
     /**
      * Close this processor and clean up any resources.
+     * <p>
+     * To generate new {@link KeyValue} pairs {@link 
ProcessorContext#forward(Object, Object)},
+     * {@link ProcessorContext#forward(Object, Object, int)}, and
+     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
      */
     void close();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index 0341702..0a01489 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -17,15 +17,28 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
  * A {@link TransformerSupplier} interface which can create one or more {@link 
Transformer} instances.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @param <R> {@link org.apache.kafka.streams.KeyValue KeyValue} return type 
(both key and value type can be set
+ *            arbitrarily)
+ * @see Transformer
+ * @see KStream#transform(TransformerSupplier, String...)
+ * @see ValueTransformer
+ * @see ValueTransformerSupplier
+ * @see KStream#transformValues(ValueTransformerSupplier, String...)
  */
[email protected]
 public interface TransformerSupplier<K, V, R> {
 
     /**
      * Return a new {@link Transformer} instance.
      *
-     * @return  a new {@link Transformer} instance
+     * @return a new {@link Transformer} instance
      */
     Transformer<K, V, R> get();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index 5099ac7..71acc1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -21,14 +21,15 @@ import 
org.apache.kafka.common.annotation.InterfaceStability;
 /**
  * The {@link ValueMapper} interface for mapping a value to a new value of 
arbitrary type.
  * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} 
is invoked individually for each record
- * of a stream.
- * Thus, if {@link ValueMapper} is applied to a {@link 
org.apache.kafka.streams.KeyValue key-value pair} record the
- * record's key is preserved.
+ * of a stream (cf. {@link ValueTransformer} for stateful value 
transformation).
+ * If {@link ValueMapper} is applied to a {@link 
org.apache.kafka.streams.KeyValue key-value pair} record the record's
+ * key is preserved.
  * If a record's key and value should be modified {@link KeyValueMapper} can 
be used.
  *
  * @param <V>  value type
  * @param <VR> mapped value type
  * @see KeyValueMapper
+ * @see ValueTransformer
  * @see KStream#mapValues(ValueMapper)
  * @see KStream#flatMapValues(ValueMapper)
  * @see KTable#mapValues(ValueMapper)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 063c352..02c3ba7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -17,46 +17,96 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TimestampExtractor;
 
 /**
- * A stateful {@link ValueTransformer} interface to transform a value into a 
new value.
+ * The {@link ValueTransformer} interface for stateful mapping of a value to a 
new value (with possible new type).
+ * This is a stateful record-by-record operation, i.e, {@link 
#transform(Object)} is invoked individually for each
+ * record of a stream and can access and modify a state that is available 
beyond a single call of
+ * {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value 
transformation).
+ * Additionally, the interface can be called in regular intervals based on the 
processing progress
+ * (cf. {@link #punctuate(long)}.
+ * If {@link ValueTransformer} is applied to a {@link KeyValue} pair record 
the record's key is preserved.
+ * <p>
+ * Use {@link ValueTransformerSupplier} to provide new instances of {@link 
ValueTransformer} to Kafka Stream's runtime.
+ * <p>
+ * If a record's key and value should be modified {@link Transformer} can be 
used.
  *
- * @param <V>   value type
- * @param <R>   return type
+ * @param <V>  value type
+ * @param <VR> transformed value type
+ * @see ValueTransformerSupplier
+ * @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see Transformer
  */
-public interface ValueTransformer<V, R> {
[email protected]
+public interface ValueTransformer<V, VR> {
 
     /**
-     * Initialize this transformer with the given context. The framework 
ensures this is called once per processor when the topology
-     * that contains it is initialized.
+     * Initialize this transformer.
+     * This is called once per instance when the topology gets initialized.
      * <p>
-     * If this transformer is to be {@link #punctuate(long) called 
periodically} by the framework, then this method should
-     * {@link ProcessorContext#schedule(long) schedule itself} with the 
provided context.
+     * The provided {@link ProcessorContext context} can be used to access 
topology and record meta data, to
+     * {@link ProcessorContext#schedule(long) schedule itself} for periodical 
calls (cf. {@link #punctuate(long)}), and
+     * to access attached {@link StateStore}s.
+     * <p>
+     * Note that {@link ProcessorContext} is updated in the background with 
the current record's meta data.
+     * Thus, it only contains valid record meta data when accessed within 
{@link #transform(Object)}.
+     * <p>
+     * Note that using {@link ProcessorContext#forward(Object, Object)},
+     * {@link ProcessorContext#forward(Object, Object, int)}, or
+     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed 
within any method of
+     * {@code ValueTransformer} and will result in an {@link StreamsException 
exception}.
      *
-     * @param context the context; may not be null
+     * @param context the context
      */
     void init(final ProcessorContext context);
 
     /**
-     * Transform the record with the given key and value.
+     * Transform the given value to a new value.
+     * Additionally, any {@link StateStore} that is {@link 
KStream#transformValues(ValueTransformerSupplier, String...)
+     * attached} to this operator can be accessed and modified arbitrarily (cf.
+     * {@link ProcessorContext#getStateStore(String)}).
+     * <p>
+     * Note, that using {@link ProcessorContext#forward(Object, Object)},
+     * {@link ProcessorContext#forward(Object, Object, int)}, and
+     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed 
within {@code transform} and
+     * will result in an {@link StreamsException exception}.
      *
-     * @param value the value for the record
-     * @return new value
+     * @param value the value to be transformed
+     * @return the new value
      */
-    R transform(final V value);
+    VR transform(final V value);
 
     /**
-     * Perform any periodic operations and possibly return a new value, if 
this processor {@link ProcessorContext#schedule(long) schedule itself} with the 
context
-     * during {@link #init(ProcessorContext) initialization}.
+     * Perform any periodic operations if this processor {@link 
ProcessorContext#schedule(long) schedule itself} with
+     * the context during {@link #init(ProcessorContext) initialization}.
+     * <p>
+     * It is not possible to return any new output records within {@code 
punctuate}.
+     * Using {@link ProcessorContext#forward(Object, Object)}, {@link 
ProcessorContext#forward(Object, Object, int)},
+     * or {@link ProcessorContext#forward(Object, Object, String)} will result 
in an
+     * {@link StreamsException exception}.
+     * Furthermore, {@code punctuate} must return {@code null}.
+     * <p>
+     * Note, that {@code punctuate} is called base on <it>stream time</it> 
(i.e., time progress with regard to
+     * timestamps return by the used {@link TimestampExtractor})
+     * and not based on wall-clock time.
      *
-     * @param timestamp the stream time when this method is being called
-     * @return new value; if null it will not be forwarded to down stream
+     * @param timestamp the stream time when {@code punctuate} is being called
+     * @return must return {@code null}&mdash;otherwise, an {@link 
StreamsException exception} will be thrown
      */
-    R punctuate(final long timestamp);
+    VR punctuate(final long timestamp);
 
     /**
      * Close this processor and clean up any resources.
+     * <p>
+     * It is not possible to return any new output records within {@code 
close()}.
+     * Using {@link ProcessorContext#forward(Object, Object)}, {@link 
ProcessorContext#forward(Object, Object, int)},
+     * or {@link ProcessorContext#forward(Object, Object, String)} will result 
in an {@link StreamsException exception}.
      */
     void close();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a01b7223/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index ecd454a..3d357ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -14,18 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
  * A {@link ValueTransformerSupplier} interface which can create one or more 
{@link ValueTransformer} instances.
+ *
+ * @param <V>  value type
+ * @param <VR> transformed value type
+ * @see ValueTransformer
+ * @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see Transformer
+ * @see TransformerSupplier
+ * @see KStream#transform(TransformerSupplier, String...)
  */
-public interface ValueTransformerSupplier<V, R> {
[email protected]
+public interface ValueTransformerSupplier<V, VR> {
 
     /**
      * Return a new {@link ValueTransformer} instance.
      *
-     * @return  a new {@link ValueTransformer} instance.
+     * @return a new {@link ValueTransformer} instance.
      */
-    ValueTransformer<V, R> get();
+    ValueTransformer<V, VR> get();
 }

Reply via email to