Repository: kafka
Updated Branches:
  refs/heads/trunk 0dfeb31a1 -> 7429f4978


MINOR: reduce() javadocs: clarify position of arguments

Author: Michael G. Noll <mich...@confluent.io>

Reviewers: Matthias J. Sax <matth...@confluent.io>, Eno Thereska 
<eno.there...@gmail.com>, Damian Guy <damian....@gmail.com>

Closes #2651 from miguno/trunk-reduce-javadocs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7429f497
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7429f497
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7429f497

Branch: refs/heads/trunk
Commit: 7429f49780a69d0c5eda8ef0c69a3209aecee11c
Parents: 0dfeb31
Author: Michael G. Noll <mich...@confluent.io>
Authored: Fri Jun 30 08:59:10 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Fri Jun 30 08:59:10 2017 +0100

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   | 66 ++++++++++++++++++--
 .../kafka/streams/kstream/KGroupedTable.java    | 14 +++--
 2 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7429f497/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index e02231a..e6faf8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -409,7 +409,16 @@ public interface KGroupedStream<K, V> {
      * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
      * <p>
      * The specified {@link Reducer} is applied for each input record and 
computes a new aggregate using the current
-     * aggregate and the record's value.
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate 
functions like sum, min, or max.
@@ -461,7 +470,16 @@ public interface KGroupedStream<K, V> {
      * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
      * <p>
      * The specified {@link Reducer} is applied for each input record and 
computes a new aggregate using the current
-     * aggregate and the record's value.
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to 
compute aggregate functions like sum, min, or
@@ -509,7 +527,16 @@ public interface KGroupedStream<K, V> {
      * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
      * <p>
      * The specified {@link Reducer} is applied for each input record and 
computes a new aggregate using the current
-     * aggregate and the record's value.
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute 
aggregate functions like sum, min, or max.
@@ -610,7 +637,16 @@ public interface KGroupedStream<K, V> {
      * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
      * <p>
      * The specified {@link Reducer} is applied for each input record and 
computes a new aggregate using the current
-     * aggregate and the record's value.
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used 
to compute aggregate functions like sum,
@@ -660,7 +696,16 @@ public interface KGroupedStream<K, V> {
      * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
      * <p>
      * The specified {@link Reducer} is applied for each input record and 
computes a new aggregate using the current
-     * aggregate and the record's value.
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to 
compute aggregate functions like sum, min,
@@ -749,7 +794,16 @@ public interface KGroupedStream<K, V> {
      * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
      * <p>
      * The specified {@link Reducer} is applied for each input record and 
computes a new aggregate using the current
-     * aggregate and the record's value.
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can 
be used to compute aggregate functions like

http://git-wip-us.apache.org/repos/asf/kafka/blob/7429f497/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 5d019c4..bf0df55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -163,10 +163,11 @@ public interface KGroupedTable<K, V> {
      * <p>
      * Each update to the original {@link KTable} results in a two step update 
of the result {@link KTable}.
      * The specified {@link Reducer adder} is applied for each update record 
and computes a new aggregate using the
-     * current aggregate and the record's value by adding the new record to 
the aggregate.
+     * current aggregate (first argument) and the record's value (second 
argument) by adding the new record to the
+     * aggregate.
      * The specified {@link Reducer substractor} is applied for each 
"replaced" record of the original {@link KTable}
-     * and computes a new aggregate using the current aggregate and the 
record's value by "removing" the "replaced"
-     * record from the aggregate.
+     * and computes a new aggregate using the current aggregate (first 
argument) and the record's value (second
+     * argument) by "removing" the "replaced" record from the aggregate.
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute 
aggregate functions like sum.
@@ -290,10 +291,11 @@ public interface KGroupedTable<K, V> {
      * <p>
      * Each update to the original {@link KTable} results in a two step update 
of the result {@link KTable}.
      * The specified {@link Reducer adder} is applied for each update record 
and computes a new aggregate using the
-     * current aggregate and the record's value by adding the new record to 
the aggregate.
+     * current aggregate (first argument) and the record's value (second 
argument) by adding the new record to the
+     * aggregate.
      * The specified {@link Reducer substractor} is applied for each 
"replaced" record of the original {@link KTable}
-     * and computes a new aggregate using the current aggregate and the 
record's value by "removing" the "replaced"
-     * record from the aggregate.
+     * and computes a new aggregate using the current aggregate (first 
argument) and the record's value (second
+     * argument) by "removing" the "replaced" record from the aggregate.
      * If there is no current aggregate the {@link Reducer} is not applied and 
the new aggregate will be the record's
      * value as-is.
      * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute 
aggregate functions like sum.

Reply via email to