Repository: kafka Updated Branches: refs/heads/trunk 0dfeb31a1 -> 7429f4978
MINOR: reduce() javadocs: clarify position of arguments Author: Michael G. Noll <mich...@confluent.io> Reviewers: Matthias J. Sax <matth...@confluent.io>, Eno Thereska <eno.there...@gmail.com>, Damian Guy <damian....@gmail.com> Closes #2651 from miguno/trunk-reduce-javadocs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7429f497 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7429f497 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7429f497 Branch: refs/heads/trunk Commit: 7429f49780a69d0c5eda8ef0c69a3209aecee11c Parents: 0dfeb31 Author: Michael G. Noll <mich...@confluent.io> Authored: Fri Jun 30 08:59:10 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Fri Jun 30 08:59:10 2017 +0100 ---------------------------------------------------------------------- .../kafka/streams/kstream/KGroupedStream.java | 66 ++++++++++++++++++-- .../kafka/streams/kstream/KGroupedTable.java | 14 +++-- 2 files changed, 68 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7429f497/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index e02231a..e6faf8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -409,7 +409,16 @@ public interface KGroupedStream<K, V> { * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * <p> * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. @@ -461,7 +470,16 @@ public interface KGroupedStream<K, V> { * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. * <p> * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or @@ -509,7 +527,16 @@ public interface KGroupedStream<K, V> { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * <p> * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max. @@ -610,7 +637,16 @@ public interface KGroupedStream<K, V> { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * <p> * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum, @@ -660,7 +696,16 @@ public interface KGroupedStream<K, V> { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * <p> * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to compute aggregate functions like sum, min, @@ -749,7 +794,16 @@ public interface KGroupedStream<K, V> { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * <p> * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like http://git-wip-us.apache.org/repos/asf/kafka/blob/7429f497/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 5d019c4..bf0df55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -163,10 +163,11 @@ public interface KGroupedTable<K, V> { * <p> * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the - * current aggregate and the record's value by adding the new record to the aggregate. + * current aggregate (first argument) and the record's value (second argument) by adding the new record to the + * aggregate. * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable} - * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" - * record from the aggregate. + * and computes a new aggregate using the current aggregate (first argument) and the record's value (second + * argument) by "removing" the "replaced" record from the aggregate. * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum. @@ -290,10 +291,11 @@ public interface KGroupedTable<K, V> { * <p> * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the - * current aggregate and the record's value by adding the new record to the aggregate. + * current aggregate (first argument) and the record's value (second argument) by adding the new record to the + * aggregate. * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable} - * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" - * record from the aggregate. + * and computes a new aggregate using the current aggregate (first argument) and the record's value (second + * argument) by "removing" the "replaced" record from the aggregate. * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum.