KAFKA-3598: Improve JavaDoc of public API Author: Matthias J. Sax <[email protected]>
Reviewers: Michael G. Noll, Guozhang Wang Closes #1250 from mjsax/JavaDoc-publicAPI Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ab4e4af Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ab4e4af Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ab4e4af Branch: refs/heads/0.10.0 Commit: 4ab4e4af814fb791fe6e8c2bd3381da8ca80b0b5 Parents: 68433dc Author: Matthias J. Sax <[email protected]> Authored: Fri Apr 29 08:49:16 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Apr 29 08:49:16 2016 -0700 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedDemo.java | 4 +- .../examples/pageview/PageViewUntypedDemo.java | 2 +- .../org/apache/kafka/streams/KafkaStreams.java | 10 ++- .../java/org/apache/kafka/streams/KeyValue.java | 17 +++++ .../kafka/streams/kstream/Aggregator.java | 10 ++- .../kafka/streams/kstream/ForeachAction.java | 10 ++- .../kafka/streams/kstream/Initializer.java | 7 +- .../kafka/streams/kstream/JoinWindows.java | 21 +++--- .../apache/kafka/streams/kstream/KStream.java | 10 +-- .../kafka/streams/kstream/KStreamBuilder.java | 15 ++-- .../apache/kafka/streams/kstream/KTable.java | 19 ++--- .../kafka/streams/kstream/KeyValueMapper.java | 9 ++- .../apache/kafka/streams/kstream/Predicate.java | 9 ++- .../apache/kafka/streams/kstream/Reducer.java | 9 ++- .../kafka/streams/kstream/TimeWindows.java | 18 +++-- .../kafka/streams/kstream/Transformer.java | 8 +- .../streams/kstream/TransformerSupplier.java | 7 +- .../kafka/streams/kstream/UnlimitedWindows.java | 9 ++- .../kafka/streams/kstream/ValueJoiner.java | 9 ++- .../kafka/streams/kstream/ValueMapper.java | 8 +- .../kafka/streams/kstream/ValueTransformer.java | 8 +- .../kstream/ValueTransformerSupplier.java | 7 +- .../apache/kafka/streams/kstream/Window.java | 18 ++++- .../apache/kafka/streams/kstream/Windowed.java | 30 +++++--- .../apache/kafka/streams/kstream/Windows.java | 37 ++++----- .../internals/KStreamWindowAggregate.java | 2 +- .../kstream/internals/KStreamWindowReduce.java | 2 +- .../kstream/internals/WindowedSerializer.java | 4 +- .../internals/WindowedStreamPartitioner.java | 6 +- .../ConsumerRecordTimestampExtractor.java | 3 +- .../kafka/streams/processor/Processor.java | 9 ++- .../streams/processor/ProcessorSupplier.java | 5 ++ .../streams/processor/StateStoreSupplier.java | 10 +++ .../streams/processor/StreamPartitioner.java | 20 ++--- .../apache/kafka/streams/processor/TaskId.java | 6 +- .../streams/processor/TimestampExtractor.java | 8 +- .../streams/processor/TopologyBuilder.java | 64 ++++++++-------- .../processor/WallclockTimestampExtractor.java | 9 ++- .../processor/internals/StreamThread.java | 2 +- .../apache/kafka/streams/state/StateSerdes.java | 79 ++++++++++++++++++-- .../apache/kafka/streams/state/WindowStore.java | 2 +- .../streams/smoketest/SmokeTestClient.java | 2 +- .../kafka/streams/smoketest/SmokeTestUtil.java | 2 +- 43 files changed, 383 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 39ec41f..e53b037 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -168,10 +168,10 @@ public class PageViewTypedDemo { public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); wViewByRegion.windowStart = key.window().start(); - wViewByRegion.region = key.value(); + wViewByRegion.region = key.key(); RegionCount rCount = new RegionCount(); - rCount.region = key.value(); + rCount.region = key.key(); rCount.count = value; return new KeyValue<>(wViewByRegion, rCount); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 9a41b9e..8a0af6c 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -107,7 +107,7 @@ public class PageViewUntypedDemo { public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); keyNode.put("window-start", key.window().start()) - .put("region", key.value()); + .put("region", key.key()); ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); valueNode.put("count", value); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4d1306d..45024f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -97,6 +97,12 @@ public class KafkaStreams { // usage only and should not be exposed to users at all. private final UUID processId; + /** + * Construct the stream instance. + * + * @param builder the processor topology builder specifying the computational logic + * @param props properties for the {@link StreamsConfig} + */ public KafkaStreams(TopologyBuilder builder, Properties props) { this(builder, new StreamsConfig(props)); } @@ -104,8 +110,8 @@ public class KafkaStreams { /** * Construct the stream instance. * - * @param builder The processor topology builder specifying the computational logic - * @param config The stream configs + * @param builder the processor topology builder specifying the computational logic + * @param config the stream configs */ public KafkaStreams(TopologyBuilder builder, StreamsConfig config) { // create the metrics http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/KeyValue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java index ca86fc4..58f2083 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java @@ -29,14 +29,31 @@ import java.util.Objects; */ public class KeyValue<K, V> { + /** The key of the key-value pair. */ public final K key; + /** The value of the key-value pair. */ public final V value; + /** + * Create a new key-value pair. + * + * @param key the key + * @param value the value + */ public KeyValue(K key, V value) { this.key = key; this.value = value; } + /** + * Create a new key-value pair. + * + * @param key the key + * @param value the value + * @param <K> the type of the key + * @param <V> the type of the value + * @return a new key value pair + */ public static <K, V> KeyValue<K, V> pair(K key, V value) { return new KeyValue<>(key, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java index 9ec9f96..989d89f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.kstream; /** - * The Aggregator interface for aggregating values of the given key. + * The {@link Aggregator} interface for aggregating values of the given key. * * @param <K> key type * @param <V> original value type @@ -26,5 +26,13 @@ package org.apache.kafka.streams.kstream; */ public interface Aggregator<K, V, T> { + /** + * Compute a new aggregate from the key and value of a record and the current aggregate of the same key. + * + * @param aggKey the key of the record + * @param value the value of the record + * @param aggregate the current aggregate value + * @return the new aggregate value + */ T apply(K aggKey, V value, T aggregate); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java index 83064e8..b3e3169 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java @@ -18,9 +18,8 @@ package org.apache.kafka.streams.kstream; - /** - * The ForeachAction interface for performing an action on a key-value pair. + * The {@link ForeachAction} interface for performing an action on a key-value pair. * Note that this action is stateless. If stateful processing is required, consider * using {@link KStream#transform(TransformerSupplier, String...)} or * {@link KStream#process(ProcessorSupplier, String...)} instead. @@ -29,6 +28,13 @@ package org.apache.kafka.streams.kstream; * @param <V> original value type */ public interface ForeachAction<K, V> { + + /** + * Perform an action for each record of a stream. + * + * @param key the key of the record + * @param value the value of the record + */ void apply(K key, V value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java index 67c1c21..39bc40d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java @@ -18,11 +18,16 @@ package org.apache.kafka.streams.kstream; /** - * The Initializer interface for creating an initial value in aggregations. + * The {@link Initializer} interface for creating an initial value in aggregations. * * @param <T> aggregate value type */ public interface Initializer<T> { + /** + * Return the initial value for an aggregation. + * + * @return the initial value for an aggregation + */ T apply(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index a74984a..a6d5603 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -26,7 +26,9 @@ import java.util.Map; */ public class JoinWindows extends Windows<TimeWindow> { + /** Maximum time difference for tuples that are before the join tuple. */ public final long before; + /** Maximum time difference for tuples that are after the join tuple. */ public final long after; private JoinWindows(String name, long before, long after) { @@ -41,40 +43,41 @@ public class JoinWindows extends Windows<TimeWindow> { } /** - * Specifies that records of the same key are joinable if their timestamp stamps are within - * timeDifference. + * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}. * - * @param timeDifference join window interval in milliseconds + * @param timeDifference join window interval */ public JoinWindows within(long timeDifference) { return new JoinWindows(this.name, timeDifference, timeDifference); } /** - * Specifies that records of the same key are joinable if their timestamp stamps are within + * Specifies that records of the same key are joinable if their timestamps are within * the join window interval, and if the timestamp of a record from the secondary stream is * earlier than or equal to the timestamp of a record from the first stream. * - * @param timeDifference join window interval in milliseconds + * @param timeDifference join window interval */ public JoinWindows before(long timeDifference) { return new JoinWindows(this.name, timeDifference, this.after); } /** - * Specifies that records of the same key are joinable if their timestamp stamps are within + * Specifies that records of the same key are joinable if their timestamps are within * the join window interval, and if the timestamp of a record from the secondary stream * is later than or equal to the timestamp of a record from the first stream. * - * @param timeDifference join window interval in milliseconds + * @param timeDifference join window interval */ public JoinWindows after(long timeDifference) { return new JoinWindows(this.name, this.before, timeDifference); } + /** + * Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}. + */ @Override public Map<Long, TimeWindow> windowsFor(long timestamp) { - // this function should never be called throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows"); } @@ -98,4 +101,4 @@ public class JoinWindows extends Windows<TimeWindow> { return result; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 7e3562c..6df2deb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; /** - * KStream is an abstraction of a <i>record stream</i> of key-value pairs. + * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs. * * @param <K> Type of keys * @param <V> Type of values @@ -510,7 +510,7 @@ public interface KStream<K, V> { String name); /** - * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable}. + * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}. * * @param windows the specification of the aggregation {@link Windows} * @param keySerde key serdes for materializing the counting table, @@ -519,7 +519,7 @@ public interface KStream<K, V> { <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde); /** - * Count number of messages of this stream by key on a window basis into a new instance of windowed {@link KTable} + * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable} * with default serializers and deserializers. * * @param windows the specification of the aggregation {@link Windows} @@ -527,7 +527,7 @@ public interface KStream<K, V> { <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows); /** - * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable}. + * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}. * * @param keySerde key serdes for materializing the counting table, * if not specified the default serdes defined in the configs will be used @@ -536,7 +536,7 @@ public interface KStream<K, V> { KTable<K, Long> countByKey(Serde<K> keySerde, String name); /** - * Count number of messages of this stream by key into a new instance of ever-updating {@link KTable} + * Count number of records of this stream by key into a new instance of ever-updating {@link KTable} * with default serializers and deserializers. * * @param name the name of the resulted {@link KTable} http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 6b770b4..159876c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -28,19 +28,22 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; /** - * KStreamBuilder is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL + * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL * for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}. */ public class KStreamBuilder extends TopologyBuilder { private final AtomicInteger index = new AtomicInteger(0); + /** + * Create a new {@link KStreamBuilder} instance. + */ public KStreamBuilder() { super(); } /** - * Creates a {@link KStream} instance from the specified topics. + * Create a {@link KStream} instance from the specified topics. * The default deserializers specified in the config are used. * * @param topics the topic names; must contain at least one topic name @@ -50,7 +53,7 @@ public class KStreamBuilder extends TopologyBuilder { } /** - * Creates a {@link KStream} instance for the specified topics. + * Create a {@link KStream} instance for the specified topics. * * @param keySerde key serde used to read this source {@link KStream}, * if not specified the default serde defined in the configs will be used @@ -67,7 +70,7 @@ public class KStreamBuilder extends TopologyBuilder { } /** - * Creates a {@link KTable} instance for the specified topic. + * Create a {@link KTable} instance for the specified topic. * The default deserializers specified in the config are used. * * @param topic the topic name; cannot be null @@ -77,7 +80,7 @@ public class KStreamBuilder extends TopologyBuilder { } /** - * Creates a {@link KTable} instance for the specified topic. + * Create a {@link KTable} instance for the specified topic. * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -98,7 +101,7 @@ public class KStreamBuilder extends TopologyBuilder { } /** - * Creates a new instance of {@link KStream} by merging the given streams + * Create a new instance of {@link KStream} by merging the given streams. * * @param streams the instances of {@link KStream} to be merged */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 8414279..4ff9b48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StreamPartitioner; /** - * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table. + * {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table. * * @param <K> Type of primary keys * @param <V> Type of value changes @@ -39,7 +39,7 @@ public interface KTable<K, V> { KTable<K, V> filter(Predicate<K, V> predicate); /** - * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate + * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate. * * @param predicate the instance of {@link Predicate} */ @@ -55,7 +55,7 @@ public interface KTable<K, V> { /** - * Print the elements of this stream to System.out + * Print the elements of this stream to {@code System.out} * * Implementors will need to override toString for keys and values that are not of * type String, Integer etc to get meaningful information. @@ -63,7 +63,7 @@ public interface KTable<K, V> { void print(); /** - * Print the elements of this stream to System.out + * Print the elements of this stream to {@code System.out} * @param keySerde key serde used to send key-value pairs, * if not specified the default serde defined in the configs will be used * @param valSerde value serde used to send key-value pairs, @@ -75,15 +75,16 @@ public interface KTable<K, V> { void print(Serde<K> keySerde, Serde<V> valSerde); /** - * Write the elements of this stream to a file at the given path. + * Write the elements of this stream to a file at the given path using default serializers and deserializers. * @param filePath name of file to write to * - * Implementors will need to override toString for keys and values that are not of - * type String, Integer etc to get meaningful information. + * Implementors will need to override {@code toString} for keys and values that are not of + * type {@link String}, {@link Integer} etc. to get meaningful information. */ void writeAsText(String filePath); /** + * Write the elements of this stream to a file at the given path. * * @param filePath name of file to write to * @param keySerde key serde used to send key-value pairs, @@ -91,8 +92,8 @@ public interface KTable<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default serde defined in the configs will be used * - * Implementors will need to override toString for keys and values that are not of - * type String, Integer etc to get meaningful information. + * Implementors will need to override {@code toString} for keys and values that are not of + * type {@link String}, {@link Integer} etc. to get meaningful information. */ void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java index a4aed91..b36ed63 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.kstream; /** - * The KeyValueMapper interface for mapping a key-value pair to a new value (could be another key-value pair). + * The {@link KeyValueMapper} interface for mapping a key-value pair to a new value (could be another key-value pair). * * @param <K> original key type * @param <V> original value type @@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream; */ public interface KeyValueMapper<K, V, R> { + /** + * Map a record with the given key and value to a new value. + * + * @param key the key of the record + * @param value the value of the record + * @return the new value + */ R apply(K key, V value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java index c90554b..2df2d5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -18,12 +18,19 @@ package org.apache.kafka.streams.kstream; /** - * The Predicate interface represents a predicate (boolean-valued function) of a key-value pair. + * The {@link Predicate} interface represents a predicate (boolean-valued function) of a key-value pair. * * @param <K> key type * @param <V> value type */ public interface Predicate<K, V> { + /** + * Test if the record with the given key and value satisfies the predicate. + * + * @param key the key of the record + * @param value the value of the record + * @return return {@code true} if the key-value pair satisfies the predicate—{@code false} otherwise + */ boolean test(K key, V value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java index 551a672..e7cfa0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java @@ -18,11 +18,18 @@ package org.apache.kafka.streams.kstream; /** - * The Reducer interface for combining two values of the same type into a new value. + * The {@link Reducer} interface for combining two values of the same type into a new value. * * @param <V> value type */ public interface Reducer<V> { + /** + * Aggregate the two given values into a single one. + * + * @param value1 the first value for the aggregation + * @param value2 the second value for the aggregation + * @return the aggregated value + */ V apply(V value1, V value2); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index fa3a9d8..e4ce883 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -24,6 +24,14 @@ import java.util.Map; /** * The time-based window specifications used for aggregations. + * <p> + * The semantics of a time-based window are: Every T1 (advance) time-units, compute the aggregate total for T2 (size) time-units. + * <ul> + * <li> If {@code advance < size} a hopping windows is defined: <br /> + * it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or more "adjacent" windows.</li> + * <li> If {@code advance == size} a tumbling window is defined:<br /> + * it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in one and only one tumbling window.</li> + * </ul> */ public class TimeWindows extends Windows<TimeWindow> { @@ -36,7 +44,7 @@ public class TimeWindows extends Windows<TimeWindow> { /** * The size of the window's advance interval, i.e. by how much a window moves forward relative - * to the previous one. The interval's effective time unit is determined by the semantics of + * to the previous one. The interval's effective time unit is determined by the semantics of * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. */ public final long advance; @@ -56,13 +64,13 @@ public class TimeWindows extends Windows<TimeWindow> { /** * Returns a window definition with the given window size, and with the advance interval being - * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th + * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th * window. * * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, - * non-overlapping windows. Tumbling windows are a specialization of hopping windows. + * non-overlapping windows. Tumbling windows are a specialization of hopping windows. * - * @param name The name of the window. Must not be null or empty. + * @param name The name of the window. Must not be null or empty. * @param size The size of the window, with the requirement that size > 0. * The window size's effective time unit is determined by the semantics of the * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. @@ -80,7 +88,7 @@ public class TimeWindows extends Windows<TimeWindow> { * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows. * * @param interval The advance interval ("hop") of the window, with the requirement that - * 0 < interval ≤ size. The interval's effective time unit is + * 0 < interval ≤ size. The interval's effective time unit is * determined by the semantics of the topology's configured * {@link org.apache.kafka.streams.processor.TimestampExtractor}. * @return a new window definition http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index 5197e94..239854b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.ProcessorContext; /** - * A stateful Transformer interface for transform a key-value pair into a new value. + * A stateful {@link Transformer} interface for transform a key-value pair into a new value. * * @param <K> key type * @param <V> value type @@ -40,10 +40,10 @@ public interface Transformer<K, V, R> { void init(ProcessorContext context); /** - * Transform the message with the given key and value. + * Transform the record with the given key and value. * - * @param key the key for the message - * @param value the value for the message + * @param key the key for the record + * @param value the value for the record * @return new value; if null no key-value pair will be forwarded to down stream */ R transform(K key, V value); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java index fc7ba60..0341702 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -18,9 +18,14 @@ package org.apache.kafka.streams.kstream; /** - * A transformer supplier which can create one or more {@link Transformer} instances. + * A {@link TransformerSupplier} interface which can create one or more {@link Transformer} instances. */ public interface TransformerSupplier<K, V, R> { + /** + * Return a new {@link Transformer} instance. + * + * @return a new {@link Transformer} instance + */ Transformer<K, V, R> get(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index bea3b57..f45f8c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -29,6 +29,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { private static final long DEFAULT_START_TIMESTAMP = 0L; + /** The start timestamp of the window. */ public final long start; private UnlimitedWindows(String name, long start) { @@ -41,12 +42,18 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { } /** - * Returns an unlimited window definition + * Return an unlimited window starting at timestamp zero. */ public static UnlimitedWindows of(String name) { return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP); } + /** + * Return a new unlimited window for the specified start timestamp. + * + * @param start the window start time + * @return a new unlimited window that starts at {@code start} + */ public UnlimitedWindows startOn(long start) { return new UnlimitedWindows(this.name, start); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java index 5f00a1a..8d4a8e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.kstream; /** - * The ValueJoiner interface for joining two values and return a the joined new value. + * The {@link ValueJoiner} interface for joining two values into a new value. * * @param <V1> first value type * @param <V2> second value type @@ -26,5 +26,12 @@ package org.apache.kafka.streams.kstream; */ public interface ValueJoiner<V1, V2, R> { + /** + * Return a joined value consisting of {@code value1} and {@code value2}. + * + * @param value1 the first value for joining + * @param value2 the second value for joining + * @return the joined value + */ R apply(V1 value1, V2 value2); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java index 6e62a55..e168e37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java @@ -18,12 +18,18 @@ package org.apache.kafka.streams.kstream; /** - * The KeyValueMapper interface for mapping an original value to a new value (could be another key-value pair). + * The {@link ValueMapper} interface for mapping an original value to a new value (could be another key-value pair). * * @param <V1> original value type * @param <V2> mapped value type */ public interface ValueMapper<V1, V2> { + /** + * Map the given value to a new value. + * + * @param value the value to be mapped + * @return the new value + */ V2 apply(V1 value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 63214fd..f92d9a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.ProcessorContext; /** - * A stateful Value Transformer interface for transform a value into a new value. + * A stateful {@link ValueTransformer} interface to transform a value into a new value. * * @param <V> value type * @param <R> return type @@ -31,7 +31,7 @@ public interface ValueTransformer<V, R> { * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology * that contains it is initialized. * <p> - * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. * * @param context the context; may not be null @@ -39,9 +39,9 @@ public interface ValueTransformer<V, R> { void init(ProcessorContext context); /** - * Transform the message with the given key and value. + * Transform the record with the given key and value. * - * @param value the value for the message + * @param value the value for the record * @return new value */ R transform(V value); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java index 6bc86bc..ecd454a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -18,9 +18,14 @@ package org.apache.kafka.streams.kstream; /** - * A value transformer supplier which can create one or more {@link ValueTransformer} instances. + * A {@link ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances. */ public interface ValueTransformerSupplier<V, R> { + /** + * Return a new {@link ValueTransformer} instance. + * + * @return a new {@link ValueTransformer} instance. + */ ValueTransformer<V, R> get(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index 784d5c3..e1ea9a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -25,25 +25,37 @@ public abstract class Window { private long start; private long end; + /** + * Create a new window for the given start time (inclusive) and end time (exclusive). + * + * @param start the start timestamp of the window (inclusive) + * @param end the end timestamp of the window (exclusive) + */ public Window(long start, long end) { this.start = start; this.end = end; } /** - * Returns the start timestamp of this window, inclusive + * Return the start timestamp of this window, inclusive */ public long start() { return start; } /** - * Returns the end timestamp of this window, exclusive + * Return the end timestamp of this window, exclusive */ public long end() { return end; } + /** + * Check if the given window overlaps with this window. + * + * @param other another window + * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise + */ public boolean overlap(Window other) { return this.start() < other.end() || other.start() < this.end(); } @@ -68,4 +80,4 @@ public abstract class Window { return (int) (n % 0xFFFFFFFFL); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index 3691282..feaf6a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -22,30 +22,40 @@ package org.apache.kafka.streams.kstream; * i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, * org.apache.kafka.common.serialization.Serde)} * - * @param <T> Type of the key + * @param <K> Type of the key */ -public class Windowed<T> { +public class Windowed<K> { - private T value; + private K key; private Window window; - public Windowed(T value, Window window) { - this.value = value; + public Windowed(K key, Window window) { + this.key = key; this.window = window; } - public T value() { - return value; + /** + * Return the key of the window. + * + * @return the key of the window + */ + public K key() { + return key; } + /** + * Return the window containing the values associated with this key. + * + * @return the window containing the values + */ public Window window() { return window; } @Override public String toString() { - return "[" + value + "@" + window.start() + "]"; + return "[" + key + "@" + window.start() + "]"; } @Override @@ -58,12 +68,12 @@ public class Windowed<T> { Windowed<?> that = (Windowed) obj; - return this.window.equals(that.window) && this.value.equals(that.value); + return this.window.equals(that.window) && this.key.equals(that.key); } @Override public int hashCode() { - long n = ((long) window.hashCode() << 32) | value.hashCode(); + long n = ((long) window.hashCode() << 32) | key.hashCode(); return (int) (n % 0xFFFFFFFFL); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 1406de6..06cacb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -30,16 +30,12 @@ public abstract class Windows<W extends Window> { private static final int DEFAULT_NUM_SEGMENTS = 3; - private static final long DEFAULT_EMIT_DURATION = 1000L; - private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day private static final AtomicInteger NAME_INDEX = new AtomicInteger(0); protected String name; - private long emitDurationMs; - private long maintainDurationMs; public int segments; @@ -50,7 +46,6 @@ public abstract class Windows<W extends Window> { } this.name = name; this.segments = DEFAULT_NUM_SEGMENTS; - this.emitDurationMs = DEFAULT_EMIT_DURATION; this.maintainDurationMs = DEFAULT_MAINTAIN_DURATION; } @@ -59,16 +54,9 @@ public abstract class Windows<W extends Window> { } /** - * Set the window emit duration in milliseconds of system time. - */ - public Windows emit(long durationMs) { - this.emitDurationMs = durationMs; - - return this; - } - - /** * Set the window maintain duration in milliseconds of system time. + * + * @return itself */ public Windows until(long durationMs) { this.maintainDurationMs = durationMs; @@ -79,6 +67,8 @@ public abstract class Windows<W extends Window> { /** * Specify the number of segments to be used for rolling the window store, * this function is not exposed to users but can be called by developers that extend this JoinWindows specs. + * + * @return itself */ protected Windows segments(int segments) { this.segments = segments; @@ -86,18 +76,21 @@ public abstract class Windows<W extends Window> { return this; } - public long emitEveryMs() { - return this.emitDurationMs; - } - + /** + * Return the window maintain duration in milliseconds of system time. + * + * @return the window maintain duration in milliseconds of system time + */ public long maintainMs() { return this.maintainDurationMs; } - protected String newName(String prefix) { - return prefix + String.format("%010d", NAME_INDEX.getAndIncrement()); - } - + /** + * Creates all windows that contain the provided timestamp. + * + * @param timestamp the timestamp window should get created for + * @return a map of {@code windowStartTimestamp -> Window} entries + */ public abstract Map<Long, W> windowsFor(long timestamp); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index f36cc8c..b4272f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -163,7 +163,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea @SuppressWarnings("unchecked") @Override public T get(Windowed<K> windowedKey) { - K key = windowedKey.value(); + K key = windowedKey.key(); W window = (W) windowedKey.window(); // this iterator should contain at most one element http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 6c05ce3..3ed1499 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -157,7 +157,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr @SuppressWarnings("unchecked") @Override public V get(Windowed<K> windowedKey) { - K key = windowedKey.value(); + K key = windowedKey.key(); W window = (W) windowedKey.window(); // this iterator should only contain one element http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java index 0afcad1..2e19816 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java @@ -40,7 +40,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> { @Override public byte[] serialize(String topic, Windowed<T> data) { - byte[] serializedKey = inner.serialize(topic, data.value()); + byte[] serializedKey = inner.serialize(topic, data.key()); ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE); buf.put(serializedKey); @@ -55,7 +55,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> { } public byte[] serializeBaseKey(String topic, Windowed<T> data) { - return inner.serialize(topic, data.value()); + return inner.serialize(topic, data.key()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index 10e69cc..1e30864 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -29,12 +29,12 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window } /** - * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value + * WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value * and the current number of partitions. The partition number id determined by the original key of the windowed key * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. * - * @param windowedKey the key of the message - * @param value the value of the message + * @param windowedKey the key of the record + * @param value the value of the record * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java index 61b1c98..0d3424e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java @@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; * via this timestamp extractor. * * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide - * <i>event-time</i> semantics. + * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using + * this extractor effectively provides <i>ingestion-time</i> semantics. * * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java index fbd72f0..92fcf12 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java @@ -37,10 +37,10 @@ public interface Processor<K, V> { void init(ProcessorContext context); /** - * Process the message with the given key and value. + * Process the record with the given key and value. * - * @param key the key for the message - * @param value the value for the message + * @param key the key for the record + * @param value the value for the record */ void process(K key, V value); @@ -53,7 +53,8 @@ public interface Processor<K, V> { void punctuate(long timestamp); /** - * Close this processor and clean up any resources. + * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup. + * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. */ void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java index 6561899..7976e16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java @@ -29,5 +29,10 @@ package org.apache.kafka.streams.processor; */ public interface ProcessorSupplier<K, V> { + /** + * Return a new {@link Processor} instance. + * + * @return a new {@link Processor} instance + */ Processor<K, V> get(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java index 993500d..f2ae020 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java @@ -22,7 +22,17 @@ package org.apache.kafka.streams.processor; */ public interface StateStoreSupplier { + /** + * Return the name of this state store supplier. + * + * @return the name of this state store supplier + */ String name(); + /** + * Return a new {@link StateStore} instance. + * + * @return a new {@link StateStore} instance + */ StateStore get(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java index f14d9d9..fbb0378 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java @@ -17,21 +17,21 @@ package org.apache.kafka.streams.processor; /** - * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's + * Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition. * <p> * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you - * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics. + * to use multiple instances of your topology to process in parallel all of the records on the topology's source topics. * <p> * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only - * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will + * those processors in that topology instance will consume the records from those partitions. In many cases, Kafka Streams will * automatically manage these instances, and adjust when new topology instances are added or removed. * <p> - * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have - * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance. - * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently - * determine to which partition each message should be written. + * Some topologies, though, need more control over which records appear in each partition. For example, some topologies that have + * stateful processors may want all records within a range of keys to always be delivered to and handled by the same topology instance. + * An upstream topology producing records to that topic can use a custom <i>stream partitioner</i> to precisely and consistently + * determine to which partition each record should be written. * <p> * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink} @@ -48,10 +48,10 @@ package org.apache.kafka.streams.processor; public interface StreamPartitioner<K, V> { /** - * Determine the partition number for a message with the given key and value and the current number of partitions. + * Determine the partition number for a record with the given key and value and the current number of partitions. * - * @param key the key of the message - * @param value the value of the message + * @param key the key of the record + * @param value the value of the record * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index fa7c73c..7fc00d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -25,11 +25,13 @@ import java.io.IOException; import java.nio.ByteBuffer; /** - * The task id representation composed as topic group id plus the assigned partition id. + * The task ID representation composed as topic group ID plus the assigned partition ID. */ public class TaskId implements Comparable<TaskId> { + /** The ID of the topic group. */ public final int topicGroupId; + /** The ID of the partition. */ public final int partition; public TaskId(int topicGroupId, int partition) { @@ -42,7 +44,7 @@ public class TaskId implements Comparable<TaskId> { } /** - * @throws TaskIdFormatException if the string is not a valid TaskId + * @throws TaskIdFormatException if the string is not a valid {@link TaskId} */ public static TaskId parse(String string) { int index = string.indexOf('_'); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 224d580..c872fa1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -26,10 +26,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; public interface TimestampExtractor { /** - * Extracts a timestamp from a message + * Extracts a timestamp from a record. + * <p> + * Typically, the timestamp represents the milliseconds since midnight, January 1, 1970 UTC. * - * @param record ConsumerRecord - * @return timestamp + * @param record a data record + * @return the timestamp of the record */ long extract(ConsumerRecord<Object, Object> record); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 487d5fe..5425149 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -43,11 +43,11 @@ import java.util.Set; /** * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors, * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to - * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes, - * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink} - * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you + * its child nodes. A {@link Processor processor} is a node in the graph that receives input records from upstream nodes, + * processes that records, and optionally forwarding new records to one or all of its children. Finally, a {@link SinkNode sink} + * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams} - * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}. + * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}. */ public class TopologyBuilder { @@ -193,7 +193,7 @@ public class TopologyBuilder { public TopologyBuilder() {} /** - * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes. + * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. @@ -208,15 +208,15 @@ public class TopologyBuilder { } /** - * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes. + * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. * * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source + * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source + * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param topics the name of one or more Kafka topics that this source is to consume @@ -242,14 +242,14 @@ public class TopologyBuilder { } /** - * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. + * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its messages - * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume + * @param topic the name of the Kafka topic to which this sink should write its records + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, StreamPartitioner, String...) @@ -261,22 +261,22 @@ public class TopologyBuilder { } /** - * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using + * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using * the supplied partitioner. * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * <p> - * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among + * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among * the named Kafka topic's partitions. Such control is often useful with topologies that use * {@link #addStateStore(StateStoreSupplier, String...) state stores} - * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute - * messages among partitions using Kafka's default partitioning logic. + * in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute + * records among partitions using Kafka's default partitioning logic. * * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its messages - * @param partitioner the function that should be used to determine the partition for each message processed by the sink - * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume + * @param topic the name of the Kafka topic to which this sink should write its records + * @param partitioner the function that should be used to determine the partition for each record processed by the sink + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) @@ -288,18 +288,18 @@ public class TopologyBuilder { } /** - * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. + * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the specified key and value serializers. * * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its messages - * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink + * @param topic the name of the Kafka topic to which this sink should write its records + * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink + * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) @@ -311,19 +311,19 @@ public class TopologyBuilder { } /** - * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. + * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the specified key and value serializers, and the supplied partitioner. * * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its messages - * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink + * @param topic the name of the Kafka topic to which this sink should write its records + * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink + * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param partitioner the function that should be used to determine the partition for each message processed by the sink - * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume + * @param partitioner the function that should be used to determine the partition for each record processed by the sink + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) @@ -354,11 +354,11 @@ public class TopologyBuilder { } /** - * Add a new processor node that receives and processes messages output by one or more parent source or processor node. - * Any new messages output by this processor will be forwarded to its child processor or sink nodes. + * Add a new processor node that receives and processes records output by one or more parent source or processor node. + * Any new record output by this processor will be forwarded to its child processor or sink nodes. * @param name the unique name of the processor node * @param supplier the supplier used to obtain this node's {@link Processor} instance - * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive + * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive * and process * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 81821ce..305573b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -25,9 +25,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; * Using this extractor effectively provides <i>processing-time</i> semantics. * * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with - * built-in <i>CreateTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details). + * built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details). */ public class WallclockTimestampExtractor implements TimestampExtractor { + + /** + * Return the current wall clock time as timestamp. + * + * @param record a data record + * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC + */ @Override public long extract(ConsumerRecord<Object, Object> record) { return System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index eff90e8..d4cb78c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -286,7 +286,7 @@ public class StreamThread extends Thread { removeStandbyTasks(); // We need to first close the underlying clients before closing the state - // manager, for example we need to make sure producer's message sends + // manager, for example we need to make sure producer's record sends // have all been acked before the state manager records // changelog sent offsets try { http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index 933bf72..b19510c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -24,13 +24,23 @@ import org.apache.kafka.common.serialization.Serializer; /** * Factory for creating serializers / deserializers for state stores in Kafka Streams. * - * @param <K> key type of serdes - * @param <V> value type of serdes + * @param <K> key type of serde + * @param <V> value type of serde */ public final class StateSerdes<K, V> { - public static <K, V> StateSerdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) { - return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); + /** + * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes. + * + * @param stateName the name of the state + * @param keyClass the class of the key type + * @param valueClass the class of the value type + * @param <K> the key type + * @param <V> the value type + * @return a new instance of {@link StateSerdes} + */ + public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) { + return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); } private final String stateName; @@ -63,46 +73,105 @@ public final class StateSerdes<K, V> { this.valueSerde = valueSerde; } + /** + * Return the key serde. + * + * @return the key serde + */ public Serde<K> keySerde() { return keySerde; } + /** + * Return the value serde. + * + * @return the value serde + */ public Serde<V> valueSerde() { return valueSerde; } + /** + * Return the key deserializer. + * + * @return the key deserializer + */ public Deserializer<K> keyDeserializer() { return keySerde.deserializer(); } + /** + * Return the key serializer. + * + * @return the key serializer + */ public Serializer<K> keySerializer() { return keySerde.serializer(); } + /** + * Return the value deserializer. + * + * @return the value deserializer + */ public Deserializer<V> valueDeserializer() { return valueSerde.deserializer(); } + /** + * Return the value serializer. + * + * @return the value serializer + */ public Serializer<V> valueSerializer() { return valueSerde.serializer(); } - public String topic() { + /** + * Return the name of the state. + * + * @return the name of the state + */ + public String stateName() { return stateName; } + /** + * Deserialize the key from raw bytes. + * + * @param rawKey the key as raw bytes + * @return the key as typed object + */ public K keyFrom(byte[] rawKey) { return keySerde.deserializer().deserialize(stateName, rawKey); } + /** + * Deserialize the value from raw bytes. + * + * @param rawValue the value as raw bytes + * @return the value as typed object + */ public V valueFrom(byte[] rawValue) { return valueSerde.deserializer().deserialize(stateName, rawValue); } + /** + * Serialize the given key. + * + * @param key the key to be serialized + * @return the serialized key + */ public byte[] rawKey(K key) { return keySerde.serializer().serialize(stateName, key); } + /** + * Serialize the given value. + * + * @param value the value to be serialized + * @return the serialized value + */ public byte[] rawValue(V value) { return valueSerde.serializer().serialize(stateName, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index c7a882f..e400cef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.streams.processor.StateStore; /** - * A windowed store interface extending {@link StateStore} + * A windowed store interface extending {@link StateStore}. * * @param <K> Type of keys * @param <V> Type of values http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 733c1ea..fbe7754 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -213,7 +213,7 @@ public class SmokeTestClient extends SmokeTestUtil { new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() { @Override public KeyValue<String, Long> apply(Windowed<String> key, Long value) { - return new KeyValue<>(key.value() + "@" + key.window().start(), value); + return new KeyValue<>(key.key() + "@" + key.window().start(), value); } } ).to(stringSerde, longSerde, "wcnt"); http://git-wip-us.apache.org/repos/asf/kafka/blob/4ab4e4af/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java index c5ded5e..b0d7a0b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java @@ -77,7 +77,7 @@ public class SmokeTestUtil { public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> { public KeyValue<K, V> apply(Windowed<K> winKey, V value) { - return new KeyValue<K, V>(winKey.value(), value); + return new KeyValue<K, V>(winKey.key(), value); } }
