Repository: kafka Updated Branches: refs/heads/trunk 2a58ba9fd -> f676cfeb8
MINOR: Improve JavaDoc for some public classes. Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Mastuda <[email protected]> Closes #999 from guozhangwang/KJavaDoc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f676cfeb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f676cfeb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f676cfeb Branch: refs/heads/trunk Commit: f676cfeb83195f032e3ad8908288a7e04011c830 Parents: 2a58ba9 Author: Guozhang Wang <[email protected]> Authored: Wed Mar 2 16:11:19 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 2 16:11:19 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/KafkaStreams.java | 16 ++++++++-------- .../org/apache/kafka/streams/StreamsConfig.java | 4 ++++ .../org/apache/kafka/streams/StreamsMetrics.java | 2 +- .../org/apache/kafka/streams/kstream/KStream.java | 8 +++----- .../org/apache/kafka/streams/kstream/KTable.java | 8 +++----- .../apache/kafka/streams/kstream/Transformer.java | 7 +++++++ .../kafka/streams/kstream/TransformerSupplier.java | 3 +++ .../kafka/streams/kstream/UnlimitedWindows.java | 3 +++ .../kafka/streams/kstream/ValueTransformer.java | 6 ++++++ .../streams/kstream/ValueTransformerSupplier.java | 3 +++ .../org/apache/kafka/streams/kstream/Windowed.java | 9 +++++++++ .../streams/processor/DefaultPartitionGrouper.java | 2 +- .../kafka/streams/processor/PartitionGrouper.java | 7 +++++++ .../kafka/streams/processor/ProcessorSupplier.java | 10 ++++++++++ .../kafka/streams/processor/TimestampExtractor.java | 3 ++- .../kafka/streams/state/KeyValueIterator.java | 2 +- .../apache/kafka/streams/state/KeyValueStore.java | 14 +++++++------- .../java/org/apache/kafka/streams/state/Serdes.java | 6 ++++++ .../java/org/apache/kafka/streams/state/Stores.java | 2 +- .../kafka/streams/state/WindowStoreIterator.java | 5 +++++ .../kafka/streams/state/WindowStoreUtils.java | 2 +- 21 files changed, 91 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 724daac..15d6d8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -55,26 +55,26 @@ import java.util.concurrent.atomic.AtomicInteger; * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. * <p> + * * A simple example might look like this: * <pre> * Map<String, Object> props = new HashMap<>(); - * props.put("bootstrap.servers", "localhost:4242"); - * props.put("key.deserializer", StringDeserializer.class); - * props.put("value.deserializer", StringDeserializer.class); - * props.put("key.serializer", StringSerializer.class); - * props.put("value.serializer", IntegerSerializer.class); - * props.put("timestamp.extractor", MyTimestampExtractor.class); + * props.put(StreamsConfig.JOB_ID_CONFIG, "my-job"); + * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + * props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + * props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + * props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + * props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); * StreamsConfig config = new StreamsConfig(props); * * KStreamBuilder builder = new KStreamBuilder(); - * builder.from("topic1").mapValue(value -> value.length()).to("topic2"); + * builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); * * KafkaStreams streams = new KafkaStreams(builder, config); * streams.start(); * </pre> * */ -// TODO: about example may need to be updated after KAFKA-3153 @InterfaceStability.Unstable public class KafkaStreams { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 65ec969..c4b8ffe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -35,6 +35,10 @@ import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; +/** + * Configuration for Kafka Streams. Documentation for these configurations can be found in the <a + * href="http://kafka.apache.org/documentation.html#streamsconfigs">Kafka documentation</a> + */ public class StreamsConfig extends AbstractConfig { private static final ConfigDef CONFIG; http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java index d392eef..70c3320 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams; import org.apache.kafka.common.metrics.Sensor; /** - * The stream metrics interface for adding metric sensors and collecting metric values. + * The Kafka Streams metrics interface for adding metric sensors and collecting metric values. */ public interface StreamsMetrics { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 231eb22..6426af9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -22,13 +22,11 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorSupplier; - -// TODO: Javadoc needs to be updated /** - * KStream is an abstraction of a stream of key-value pairs. + * KStream is an abstraction of an event stream in key-value pairs. * - * @param <K> the type of keys - * @param <V> the type of values + * @param <K> Type of keys + * @param <V> Type of values */ public interface KStream<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index a2c6397..485bb20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -21,13 +21,11 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; -// TODO: Javadoc needs to be updated. /** - * KTable is an abstraction of a change log stream. + * KTable is an abstraction of a change log stream from a primary-keyed table. * - * - * @param <K> the type of keys - * @param <V> the type of values + * @param <K> Type of primary keys + * @param <V> Type of value changes */ public interface KTable<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index b67f619..47198e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -19,6 +19,13 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.ProcessorContext; +/** + * A stateful Transformer interface for transform a key-value pair into a new value. + * + * @param <K> Key type. + * @param <V> Value type. + * @param <R> Return type. + */ public interface Transformer<K, V, R> { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java index 93d930d..fc7ba60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream; +/** + * A transformer supplier which can create one or more {@link Transformer} instances. + */ public interface TransformerSupplier<K, V, R> { Transformer<K, V, R> get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 06882b3..7cadfb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -22,6 +22,9 @@ import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import java.util.HashMap; import java.util.Map; +/** + * The unlimited window specifications. + */ public class UnlimitedWindows extends Windows<UnlimitedWindow> { private static final long DEFAULT_START_TIMESTAMP = 0L; http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 5b9e2ff..b4d2b38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -19,6 +19,12 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.ProcessorContext; +/** + * A stateful Value Transformer interface for transform a value into a new value. + * + * @param <V> Value type. + * @param <R> Return type. + */ public interface ValueTransformer<V, R> { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java index 04fa9eb..6bc86bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams.kstream; +/** + * A value transformer supplier which can create one or more {@link ValueTransformer} instances. + */ public interface ValueTransformerSupplier<V, R> { ValueTransformer<V, R> get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java index 845f9e9..22d52aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java @@ -17,6 +17,15 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +/** + * The windowed key interface used in {@link KTable}, used for representing a windowed table result from windowed stream aggregations, + * i.e. {@link KStream#aggregateByKey(Initializer, Aggregator, Windows, Serializer, Serializer, Deserializer, Deserializer)} + * + * @param <T> Type of the key + */ public class Windowed<T> { private T value; http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index dad5c6f..06681ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.Set; /** - * DefaultPartitionGrouper groups partitions by the partition id. This behavior is assumed by the join processing in KStream. + * Default implementation of the {@link PartitionGrouper} interface that groups partitions by the partition id. * * Join operations requires that topics of the joining entities are copartitoned, i.e., being partitioned by the same key and having the same * number of partitions. Copartitioning is ensured by having the same number of partitions on http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index f8311e7..ae9844d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -23,6 +23,13 @@ import org.apache.kafka.common.TopicPartition; import java.util.Map; import java.util.Set; +/** + * A partition grouper that generates partition groups given the list of topic-partitions. + * + * This grouper also acts as the stream task creation function along with partition distribution + * such that each generated partition group is assigned with a distinct {@link TaskId}; + * the created task ids will then be assigned to Kafka Streams instances that host the stream job. + */ public interface PartitionGrouper { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java index 719d3ac..6561899 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java @@ -17,6 +17,16 @@ package org.apache.kafka.streams.processor; +/** + * A processor supplier that can create one or more {@link Processor} instances. + * + * It is used in {@link TopologyBuilder} for adding new processor operators, whose generated + * topology can then be replicated (and thus creating one or more {@link Processor} instances) + * and distributed to multiple stream threads. + * + * @param <K> the type of keys + * @param <V> the type of values + */ public interface ProcessorSupplier<K, V> { Processor<K, V> get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index ce0ba70..224d580 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -20,7 +20,8 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord} + * An interface that allows the Kafka Streams framework to extract a timestamp from an instance of {@link ConsumerRecord}. + * The extracted timestamp is defined as milliseconds. */ public interface TimestampExtractor { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index 55ec8cf..cdb3de5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -33,5 +33,5 @@ import java.util.Iterator; public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable { @Override - public void close(); + void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index f296230..3e7f6fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -39,7 +39,7 @@ public interface KeyValueStore<K, V> extends StateStore { * @return The value or null if no value is found. * @throws NullPointerException If null is used for key. */ - abstract public V get(K key); + V get(K key); /** * Update the value associated with this key @@ -48,7 +48,7 @@ public interface KeyValueStore<K, V> extends StateStore { * @param value The value * @throws NullPointerException If null is used for key or value. */ - abstract public void put(K key, V value); + void put(K key, V value); /** * Update the value associated with this key, unless a value @@ -59,7 +59,7 @@ public interface KeyValueStore<K, V> extends StateStore { * @return The old value or null if there is no such key. * @throws NullPointerException If null is used for key or value. */ - abstract public V putIfAbsent(K key, V value); + V putIfAbsent(K key, V value); /** * Update all the given key/value pairs @@ -67,7 +67,7 @@ public interface KeyValueStore<K, V> extends StateStore { * @param entries A list of entries to put into the store. * @throws NullPointerException If null is used for any key or value. */ - abstract public void putAll(List<KeyValue<K, V>> entries); + void putAll(List<KeyValue<K, V>> entries); /** * Delete the value from the store (if there is one) @@ -76,7 +76,7 @@ public interface KeyValueStore<K, V> extends StateStore { * @return The old value or null if there is no such key. * @throws NullPointerException If null is used for key. */ - abstract public V delete(K key); + V delete(K key); /** * Get an iterator over a given range of keys. This iterator MUST be closed after use. @@ -86,13 +86,13 @@ public interface KeyValueStore<K, V> extends StateStore { * @return The iterator for this range. * @throws NullPointerException If null is used for from or to. */ - abstract public KeyValueIterator<K, V> range(K from, K to); + KeyValueIterator<K, V> range(K from, K to); /** * Return an iterator over all keys in the database. This iterator MUST be closed after use. * * @return An iterator of all key/value pairs in the store. */ - abstract public KeyValueIterator<K, V> all(); + KeyValueIterator<K, V> all(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java index e1e78af..e925312 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java @@ -27,6 +27,12 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +/** + * Factory for creating serializers / deserializers for state stores in Kafka Streams. + * + * @param <K> key type of serdes + * @param <V> value type of serdes + */ public final class Serdes<K, V> { public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index e9d82bc..e803832 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; /** - * Factory for creating key-value stores. + * Factory for creating state stores in Kafka Streams. */ public class Stores { http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index 08cd049..7c474dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -23,6 +23,11 @@ import org.apache.kafka.streams.KeyValue; import java.util.Iterator; +/** + * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}. + * + * @param <E> Type of values + */ public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> { void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f676cfeb/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java index 3a3d585..c6bbb23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java @@ -21,7 +21,7 @@ package org.apache.kafka.streams.state; import java.nio.ByteBuffer; -public class WindowStoreUtils<K, V> { +public class WindowStoreUtils { public static final int TIMESTAMP_SIZE = 8; public static final int SEQNUM_SIZE = 4;
