Repository: kafka Updated Branches: refs/heads/trunk a62eb5993 -> f75e33502
MINOR: complete built-in stream aggregate functions Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda Closes #787 from guozhangwang/KBuiltInAgg Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f75e3350 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f75e3350 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f75e3350 Branch: refs/heads/trunk Commit: f75e3350259c6df11719ee0a55f5780c136f3d26 Parents: a62eb59 Author: Guozhang Wang <[email protected]> Authored: Mon Jan 18 13:43:52 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Jan 18 13:43:52 2016 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 37 ------------ .../streams/kstream/internals/KStreamImpl.java | 63 +++++++++----------- 2 files changed, 27 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f75e3350/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 85d51e9..36741a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorSupplier; -import java.util.Collection; /** * KStream is an abstraction of a stream of key-value pairs. @@ -292,28 +291,6 @@ public interface KStream<K, V> { Deserializer<K> keyDeserializer); /** - * Sum extracted integer values of this stream by key on a window basis. - * - * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value - * @param windows the specification of the aggregation window - */ - <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer); - - /** - * Sum extracted double decimal values of this stream by key on a window basis. - * - * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value - * @param windows the specification of the aggregation window - */ - <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer); - - /** * Count number of records of this stream by key on a window basis. * * @param windows the specification of the aggregation window @@ -322,18 +299,4 @@ public interface KStream<K, V> { Serializer<K> keySerializer, Deserializer<K> keyDeserializer); - /** - * Get the top-k values of this stream by key on a window basis. - * - * @param k parameter of the top-k computation - * @param valueSelector the class of KeyValueMapper to extract the comparable value - * @param windows the specification of the aggregation window - */ - <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k, - KeyValueMapper<K, V, V1> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Serializer<V1> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V1> aggValueDeserializer); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f75e3350/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7b634dc..691910b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -18,14 +18,14 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.AggregatorSupplier; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper; -import org.apache.kafka.streams.kstream.KeyValueToIntMapper; import org.apache.kafka.streams.kstream.KeyValueToLongMapper; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -43,7 +43,6 @@ import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.Serdes; import java.lang.reflect.Array; -import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -71,6 +70,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-"; + private static final String SELECT_NAME = "KSTREAM-SELECT-"; + private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-"; public static final String SINK_NAME = "KSTREAM-SINK-"; @@ -403,7 +404,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V // TODO: this agg window operator is only used for casting K to Windowed<K> for // KTableProcessorSupplier, which is a bit awkward and better be removed in the future String aggregateName = topology.newName(AGGREGATE_NAME); - String aggWindowName = topology.newName(WINDOWED_NAME); + String selectName = topology.newName(SELECT_NAME); ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>(); ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get()); @@ -418,8 +419,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V null); // aggregate the values with the aggregator and local store - topology.addProcessor(aggWindowName, aggWindowSupplier, this.name); - topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName); + topology.addProcessor(selectName, aggWindowSupplier, this.name); + topology.addProcessor(aggregateName, aggregateSupplier, selectName); topology.addStateStore(aggregateStore, aggregateName); // return the KTable representation with the intermediate topic as the sources @@ -427,47 +428,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector, + public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector, Windows<W> windows, Serializer<K> keySerializer, Deserializer<K> keyDeserializer) { - // TODO - return null; - } - public <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer) { - // TODO - return null; + KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() { + @Override + public KeyValue<K, Long> apply(K key, V value) { + return new KeyValue<>(key, valueSelector.apply(key, value)); + } + }); + + return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(), + windows, + keySerializer, + new LongSerializer(), + keyDeserializer, + new LongDeserializer()); } - public <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer) { - // TODO - return null; - } @Override public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serializer<K> keySerializer, Deserializer<K> keyDeserializer) { - // TODO - return null; - } - @Override - public <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k, - KeyValueMapper<K, V, V1> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Serializer<V1> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V1> aggValueDeserializer) { - // TODO - return null; + return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(), + windows, + keySerializer, + new LongSerializer(), + keyDeserializer, + new LongDeserializer()); } }
