KAFKA-3440: Update streams javadocs - add class doc for KTable, KStream, JoinWindows - add missing return tags
Author: Matthias J. Sax <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Michael G. Noll <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1287 from mjsax/kafka-3440-JavaDoc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3414d561 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3414d561 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3414d561 Branch: refs/heads/0.10.0 Commit: 3414d56121d8d2f66f8dd613453af71d5b3f0c5f Parents: 69d9a66 Author: Matthias J. Sax <[email protected]> Authored: Fri Apr 29 12:50:02 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Apr 29 12:50:02 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/JoinWindows.java | 18 ++++ .../kafka/streams/kstream/KGroupedTable.java | 14 ++- .../apache/kafka/streams/kstream/KStream.java | 95 +++++++++++++++++++- .../kafka/streams/kstream/KStreamBuilder.java | 14 ++- .../apache/kafka/streams/kstream/KTable.java | 47 +++++++++- .../kafka/streams/processor/StateStore.java | 4 +- .../apache/kafka/streams/state/WindowStore.java | 2 + 7 files changed, 189 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index a6d5603..f45c064 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -23,6 +23,24 @@ import java.util.Map; /** * The window specifications used for joins. + * <p> + * A {@link JoinWindows} instance defines a join over two stream on the same key and a maximum time difference. + * In SQL-style you would express this join as + * <pre> + * SELECT * FROM stream1, stream2 + * WHERE + * stream1.key = stream2.key + * AND + * stream2.ts - before <= stream1.ts <= stream2.ts + after + * </pre> + * There are three different window configuration supported: + * <ul> + * <li>before = after = time-difference</li> + * <li>before = 0 and after = time-difference</li> + * <li>before = time-difference and after = 0</li> + * </ul> + * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as + * a join specification on the second stream with flipped before and after values. */ public class JoinWindows extends Windows<TimeWindow> { http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 86c34b1..2ebad87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -21,7 +21,11 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; /** - * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table. + * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table, + * usually on a different grouping key than the original primary key. + * <p> + * It is an intermediate representation after a re-grouping of a {@link KTable} before an aggregation is applied + * to the new partitions resulting in a new {@link KTable}. * * @param <K> Type of primary keys * @param <V> Type of value changes @@ -35,6 +39,8 @@ public interface KGroupedTable<K, V> { * @param adder the instance of {@link Reducer} for addition * @param subtractor the instance of {@link Reducer} for subtraction * @param name the name of the resulted {@link KTable} + * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable}, + * containing aggregated values for each key */ KTable<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, @@ -50,6 +56,8 @@ public interface KGroupedTable<K, V> { * if not specified the default serdes defined in the configs will be used * @param name the name of the resulted table * @param <T> the value type of the aggregated {@link KTable} + * @return a {@link KTable} with same key and aggregated value type {@code T}, + * containing aggregated values for each key */ <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, @@ -66,6 +74,8 @@ public interface KGroupedTable<K, V> { * @param substractor the instance of {@link Aggregator} for subtraction * @param name the name of the resulted {@link KTable} * @param <T> the value type of the aggregated {@link KTable} + * @return a {@link KTable} with same key and aggregated value type {@code T}, + * containing aggregated values for each key */ <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, @@ -76,6 +86,8 @@ public interface KGroupedTable<K, V> { * Count number of records of this stream by the selected key into a new instance of {@link KTable}. * * @param name the name of the resulted {@link KTable} + * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable}, + * containing the number of values for each key */ KTable<K, Long> count(String name); http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6df2deb..a1ecfa4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -25,9 +25,17 @@ import org.apache.kafka.streams.processor.StreamPartitioner; /** * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs. + * <p> + * A {@link KStream} is either defined from one or multiple Kafka topics that are consumed message by message or + * the result of a {@link KStream} transformation. A {@link KTable} can also be converted into a {@link KStream}. + * <p> + * A {@link KStream} can be transformed record by record, joined with another {@link KStream} or {@link KTable}, or + * can be aggregated into a {@link KTable}. * * @param <K> Type of keys * @param <V> Type of values + * + * @see KTable */ @InterfaceStability.Unstable public interface KStream<K, V> { @@ -36,6 +44,8 @@ public interface KStream<K, V> { * Create a new instance of {@link KStream} that consists of all elements of this stream which satisfy a predicate. * * @param predicate the instance of {@link Predicate} + * + * @return a {@link KStream} that contains only those records that satisfy the given predicate */ KStream<K, V> filter(Predicate<K, V> predicate); @@ -43,6 +53,8 @@ public interface KStream<K, V> { * Create a new instance of {@link KStream} that consists all elements of this stream which do not satisfy a predicate. * * @param predicate the instance of {@link Predicate} + * + * @return a {@link KStream} that contains only those records that do not satisfy the given predicate */ KStream<K, V> filterNot(Predicate<K, V> predicate); @@ -52,6 +64,8 @@ public interface KStream<K, V> { * * @param mapper the instance of {@link KeyValueMapper} * @param <K1> the new key type on the stream + * + * @return a {@link KStream} that contains records with different key type and same value type */ <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper); @@ -61,6 +75,8 @@ public interface KStream<K, V> { * @param mapper the instance of {@link KeyValueMapper} * @param <K1> the key type of the new stream * @param <V1> the value type of the new stream + * + * @return a {@link KStream} that contains records with new key and value type */ <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper); @@ -69,6 +85,8 @@ public interface KStream<K, V> { * * @param mapper the instance of {@link ValueMapper} * @param <V1> the value type of the new stream + * + * @return a {@link KStream} that contains records with unmodified keys and new values of different type */ <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper); @@ -124,6 +142,8 @@ public interface KStream<K, V> { * @param mapper the instance of {@link KeyValueMapper} * @param <K1> the key type of the new stream * @param <V1> the value type of the new stream + * + * @return a {@link KStream} that contains more or less records with new key and value type */ <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper); @@ -132,6 +152,8 @@ public interface KStream<K, V> { * * @param processor the instance of {@link ValueMapper} * @param <V1> the value type of the new stream + * + * @return a {@link KStream} that contains more or less records with unmodified keys and new values of different type */ <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor); @@ -143,6 +165,8 @@ public interface KStream<K, V> { * assigned to this stream only. An element will be dropped if none of the predicates evaluate to true. * * @param predicates the ordered list of {@link Predicate} instances + * + * @return multiple distinct substreams of this {@link KStream} */ KStream<K, V>[] branch(Predicate<K, V>... predicates); @@ -152,6 +176,8 @@ public interface KStream<K, V> { * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}. * * @param topic the topic name + * + * @return a {@link KStream} that contains the exact same records as this {@link KStream} */ KStream<K, V> through(String topic); @@ -159,7 +185,7 @@ public interface KStream<K, V> { * Perform an action on each element of {@link KStream}. * Note that this is a terminal operation that returns void. * - * @param action An action to perform on each element + * @param action an action to perform on each element */ void foreach(ForeachAction<K, V> action); @@ -171,6 +197,8 @@ public interface KStream<K, V> { * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used * @param topic the topic name + * + * @return a {@link KStream} that contains the exact same records as this {@link KStream} */ KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic); @@ -187,6 +215,8 @@ public interface KStream<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name + * + * @return a {@link KStream} that contains the exact same records as this {@link KStream} */ KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic); @@ -205,6 +235,8 @@ public interface KStream<K, V> { * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used * @param topic the topic name + * + * @return a {@link KStream} that contains the exact same records as this {@link KStream} */ KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic); @@ -260,6 +292,8 @@ public interface KStream<K, V> { * * @param transformerSupplier the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer} * @param stateStoreNames the names of the state store used by the processor + * + * @return a new {@link KStream} with transformed key and value types */ <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames); @@ -268,6 +302,8 @@ public interface KStream<K, V> { * * @param valueTransformerSupplier the instance of {@link ValueTransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.ValueTransformer} * @param stateStoreNames the names of the state store used by the processor + * + * @return a {@link KStream} that contains records with unmodified keys and transformed values with type {@code R} */ <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames); @@ -293,6 +329,9 @@ public interface KStream<K, V> { * if not specified the default serdes defined in the configs will be used * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, R> KStream<K, R> join( KStream<K, V1> otherStream, @@ -311,6 +350,9 @@ public interface KStream<K, V> { * @param windows the specification of the {@link JoinWindows} * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, R> KStream<K, R> join( KStream<K, V1> otherStream, @@ -331,6 +373,9 @@ public interface KStream<K, V> { * if not specified the default serdes defined in the configs will be used * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, R> KStream<K, R> outerJoin( KStream<K, V1> otherStream, @@ -349,6 +394,9 @@ public interface KStream<K, V> { * @param windows the specification of the {@link JoinWindows} * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, R> KStream<K, R> outerJoin( KStream<K, V1> otherStream, @@ -367,6 +415,9 @@ public interface KStream<K, V> { * if not specified the default serdes defined in the configs will be used * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, R> KStream<K, R> leftJoin( KStream<K, V1> otherStream, @@ -384,6 +435,9 @@ public interface KStream<K, V> { * @param windows the specification of the {@link JoinWindows} * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, R> KStream<K, R> leftJoin( KStream<K, V1> otherStream, @@ -397,6 +451,9 @@ public interface KStream<K, V> { * @param joiner the instance of {@link ValueJoiner} * @param <V1> the value type of the table * @param <V2> the value type of the new stream + * + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals */ <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner); @@ -409,6 +466,10 @@ public interface KStream<K, V> { * if not specified the default serdes defined in the configs will be used * @param valueSerde value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used + * + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values + * that represent the latest (rolling) aggregate for each key within that window */ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows, @@ -421,6 +482,10 @@ public interface KStream<K, V> { * * @param reducer the instance of {@link Reducer} * @param windows the specification of the aggregation {@link Windows} + * + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values + * that represent the latest (rolling) aggregate for each key within that window */ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows); @@ -433,6 +498,8 @@ public interface KStream<K, V> { * @param valueSerde value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used * @param name the name of the resulted {@link KTable} + * + * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key */ KTable<K, V> reduceByKey(Reducer<V> reducer, Serde<K> keySerde, @@ -444,6 +511,8 @@ public interface KStream<K, V> { * * @param reducer the instance of {@link Reducer} * @param name the name of the resulted {@link KTable} + * + * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key */ KTable<K, V> reduceByKey(Reducer<V> reducer, String name); @@ -458,6 +527,10 @@ public interface KStream<K, V> { * @param aggValueSerde aggregate value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used * @param <T> the value type of the resulted {@link KTable} + * + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values with type {@code T} + * that represent the latest (rolling) aggregate for each key within that window */ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, @@ -473,6 +546,10 @@ public interface KStream<K, V> { * @param aggregator the instance of {@link Aggregator} * @param windows the specification of the aggregation {@link Windows} * @param <T> the value type of the resulted {@link KTable} + * + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values with type {@code T} + * that represent the latest (rolling) aggregate for each key within that window */ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, @@ -489,6 +566,8 @@ public interface KStream<K, V> { * if not specified the default serdes defined in the configs will be used * @param name the name of the resulted {@link KTable} * @param <T> the value type of the resulted {@link KTable} + * + * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key */ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, @@ -504,6 +583,8 @@ public interface KStream<K, V> { * @param aggregator the class of {@link Aggregator} * @param name the name of the resulted {@link KTable} * @param <T> the value type of the resulted {@link KTable} + * + * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key */ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, @@ -515,6 +596,10 @@ public interface KStream<K, V> { * @param windows the specification of the aggregation {@link Windows} * @param keySerde key serdes for materializing the counting table, * if not specified the default serdes defined in the configs will be used + * + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values + * that represent the latest (rolling) count (i.e., number of records) for each key within that window */ <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde); @@ -523,6 +608,10 @@ public interface KStream<K, V> { * with default serializers and deserializers. * * @param windows the specification of the aggregation {@link Windows} + * + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values + * that represent the latest (rolling) count (i.e., number of records) for each key within that window */ <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows); @@ -532,6 +621,8 @@ public interface KStream<K, V> { * @param keySerde key serdes for materializing the counting table, * if not specified the default serdes defined in the configs will be used * @param name the name of the resulted {@link KTable} + * + * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key */ KTable<K, Long> countByKey(Serde<K> keySerde, String name); @@ -540,6 +631,8 @@ public interface KStream<K, V> { * with default serializers and deserializers. * * @param name the name of the resulted {@link KTable} + * + * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key */ KTable<K, Long> countByKey(String name); http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 159876c..9d90ba0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -45,21 +45,27 @@ public class KStreamBuilder extends TopologyBuilder { /** * Create a {@link KStream} instance from the specified topics. * The default deserializers specified in the config are used. + * <p> + * If multiple topics are specified there are nor ordering guaranteed for records from different topics. * * @param topics the topic names; must contain at least one topic name + * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(String... topics) { return stream(null, null, topics); } /** - * Create a {@link KStream} instance for the specified topics. + * Create a {@link KStream} instance from the specified topics. + * <p> + * If multiple topics are specified there are nor ordering guaranteed for records from different topics. * * @param keySerde key serde used to read this source {@link KStream}, * if not specified the default serde defined in the configs will be used * @param valSerde value serde used to read this source {@link KStream}, * if not specified the default serde defined in the configs will be used * @param topics the topic names; must contain at least one topic name + * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) { String name = newName(KStreamImpl.SOURCE_NAME); @@ -74,6 +80,7 @@ public class KStreamBuilder extends TopologyBuilder { * The default deserializers specified in the config are used. * * @param topic the topic name; cannot be null + * @return a {@link KTable} for the specified topics */ public <K, V> KTable<K, V> table(String topic) { return table(null, null, topic); @@ -87,6 +94,7 @@ public class KStreamBuilder extends TopologyBuilder { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be null + * @return a {@link KTable} for the specified topics */ public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) { String source = newName(KStreamImpl.SOURCE_NAME); @@ -102,8 +110,11 @@ public class KStreamBuilder extends TopologyBuilder { /** * Create a new instance of {@link KStream} by merging the given streams. + * <p> + * There are nor ordering guaranteed for records from different streams. * * @param streams the instances of {@link KStream} to be merged + * @return a {@link KStream} containing all records of the given streams */ public <K, V> KStream<K, V> merge(KStream<K, V>... streams) { return KStreamImpl.merge(this, streams); @@ -114,6 +125,7 @@ public class KStreamBuilder extends TopologyBuilder { * This function is only for internal usage. * * @param prefix processor name prefix + * @return a new unique name */ public String newName(String prefix) { return prefix + String.format("%010d", index.getAndIncrement()); http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 4ff9b48..cc5a521 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -24,9 +24,18 @@ import org.apache.kafka.streams.processor.StreamPartitioner; /** * {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table. + * Each record in this stream is an update on the primary-keyed table with the record key as the primary key. + * <p> + * A {@link KTable} is either defined from one or multiple Kafka topics that are consumed message by message or + * the result of a {@link KTable} transformation. An aggregation of a {@link KStream} also yields a {@link KTable}. + * <p> + * A {@link KTable} can be transformed record by record, joined with another {@link KTable} or {@link KStream}, or + * can be re-partitioned and aggregated into a new {@link KTable}. * * @param <K> Type of primary keys * @param <V> Type of value changes + * + * @see KStream */ @InterfaceStability.Unstable public interface KTable<K, V> { @@ -35,6 +44,8 @@ public interface KTable<K, V> { * Create a new instance of {@link KTable} that consists of all elements of this stream which satisfy a predicate. * * @param predicate the instance of {@link Predicate} + * + * @return a {@link KTable} that contains only those records that satisfy the given predicate */ KTable<K, V> filter(Predicate<K, V> predicate); @@ -42,6 +53,8 @@ public interface KTable<K, V> { * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate. * * @param predicate the instance of {@link Predicate} + * + * @return a {@link KTable} that contains only those records that do not satisfy the given predicate */ KTable<K, V> filterNot(Predicate<K, V> predicate); @@ -50,6 +63,8 @@ public interface KTable<K, V> { * * @param mapper the instance of {@link ValueMapper} * @param <V1> the value type of the new stream + * + * @return a {@link KTable} that contains records with unmodified keys and new values of different type */ <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper); @@ -103,6 +118,8 @@ public interface KTable<K, V> { * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}. * * @param topic the topic name + * + * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable<K, V> through(String topic); @@ -114,6 +131,8 @@ public interface KTable<K, V> { * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used * @param topic the topic name + * + * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic); @@ -130,6 +149,8 @@ public interface KTable<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name + * + * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic); @@ -148,6 +169,8 @@ public interface KTable<K, V> { * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used * @param topic the topic name + * + * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic); @@ -200,6 +223,10 @@ public interface KTable<K, V> { /** * Convert this stream to a new instance of {@link KStream}. + * + * @return a {@link KStream} that contains the same records as this {@link KTable}; + * the records are no longer treated as updates on a primary-keyed table, + * but rather as normal key-value pairs in a record stream */ KStream<K, V> toStream(); @@ -209,6 +236,11 @@ public interface KTable<K, V> { * * @param mapper @param mapper the instance of {@link KeyValueMapper} * @param <K1> the new key type + * + * @return a {@link KStream} that contains records with new keys of different type for each update of this {@link KTable} + * @return a {@link KStream} that contains the transformed records from this {@link KTable}; + * the records are no longer treated as updates on a primary-keyed table, + * but rather as normal key-value pairs in a record stream */ <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper); @@ -219,6 +251,9 @@ public interface KTable<K, V> { * @param joiner the instance of {@link ValueJoiner} * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key */ <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); @@ -229,6 +264,9 @@ public interface KTable<K, V> { * @param joiner the instance of {@link ValueJoiner} * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key */ <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); @@ -239,6 +277,9 @@ public interface KTable<K, V> { * @param joiner the instance of {@link ValueJoiner} * @param <V1> the value type of the other stream * @param <R> the value type of the new stream + * + * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key */ <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); @@ -252,6 +293,8 @@ public interface KTable<K, V> { * if not specified the default serdes defined in the configs will be used * @param <K1> the key type of the {@link KGroupedTable} * @param <V1> the value type of the {@link KGroupedTable} + * + * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable} */ <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde); @@ -261,6 +304,8 @@ public interface KTable<K, V> { * @param selector select the grouping key and value to be aggregated * @param <K1> the key type of the {@link KGroupedTable} * @param <V1> the value type of the {@link KGroupedTable} + * + * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable} */ <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector); @@ -268,7 +313,7 @@ public interface KTable<K, V> { * Perform an action on each element of {@link KTable}. * Note that this is a terminal operation that returns void. * - * @param action An action to perform on each element + * @param action an action to perform on each element */ void foreach(ForeachAction<K, V> action); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index b07e510..f79e6f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -51,7 +51,9 @@ public interface StateStore { void close(); /** - * If the storage is persistent + * Return if the storage is persistent or not. + * + * @return {@code true} if the storage is persistent—{@code false} otherwise */ boolean persistent(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3414d561/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index e400cef..079a2b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -45,6 +45,8 @@ public interface WindowStore<K, V> extends StateStore { /** * Get all the key-value pairs with the given key and the time range from all * the existing windows. + * + * @return an iterator over key-value pairs {@code <timestamp, value>} */ WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); }
