KAFKA-3121: Remove aggregatorSupplier and add Reduce functions Author: Guozhang Wang <[email protected]>
Reviewers: Yasuhiro Matsuda Closes #795 from guozhangwang/K3121s1 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/959cf09e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/959cf09e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/959cf09e Branch: refs/heads/trunk Commit: 959cf09e8653f4b8255f49c6f4c258ed1a5ec38e Parents: e4ef8e6 Author: Guozhang Wang <[email protected]> Authored: Wed Jan 20 16:10:43 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Jan 20 16:10:43 2016 -0800 ---------------------------------------------------------------------- .../kafka/streams/examples/KTableJob.java | 111 --- .../kafka/streams/kstream/Aggregator.java | 7 +- .../streams/kstream/AggregatorSupplier.java | 23 - .../org/apache/kafka/streams/kstream/Count.java | 36 + .../apache/kafka/streams/kstream/KStream.java | 38 +- .../apache/kafka/streams/kstream/KTable.java | 59 +- .../apache/kafka/streams/kstream/Reducer.java | 23 + .../apache/kafka/streams/kstream/SumAsLong.java | 36 + .../kstream/internals/CountSupplier.java | 52 -- .../kstream/internals/KStreamAggregate.java | 4 +- .../streams/kstream/internals/KStreamImpl.java | 122 ++-- .../kstream/internals/KStreamReduce.java | 167 +++++ .../kstream/internals/KTableAggregate.java | 4 +- .../streams/kstream/internals/KTableImpl.java | 128 ++-- .../streams/kstream/internals/KTableReduce.java | 120 ++++ .../kstream/internals/KTableStoreSupplier.java | 4 +- .../kstream/internals/LongSumSupplier.java | 52 -- .../streams/kstream/internals/TopKSupplier.java | 106 --- .../internals/ProcessorStateManager.java | 2 +- .../state/InMemoryKeyValueStoreSupplier.java | 155 ----- .../state/InMemoryLRUCacheStoreSupplier.java | 195 ------ .../streams/state/MeteredKeyValueStore.java | 250 ------- .../kafka/streams/state/MeteredWindowStore.java | 206 ------ .../kafka/streams/state/OffsetCheckpoint.java | 162 ----- .../state/RocksDBKeyValueStoreSupplier.java | 52 -- .../kafka/streams/state/RocksDBStore.java | 265 -------- .../kafka/streams/state/RocksDBWindowStore.java | 289 -------- .../state/RocksDBWindowStoreSupplier.java | 58 -- .../kafka/streams/state/StoreChangeLogger.java | 91 --- .../org/apache/kafka/streams/state/Stores.java | 3 + .../InMemoryKeyValueStoreSupplier.java | 159 +++++ .../InMemoryLRUCacheStoreSupplier.java | 199 ++++++ .../state/internals/MeteredKeyValueStore.java | 254 +++++++ .../state/internals/MeteredWindowStore.java | 209 ++++++ .../state/internals/OffsetCheckpoint.java | 162 +++++ .../internals/RocksDBKeyValueStoreSupplier.java | 53 ++ .../streams/state/internals/RocksDBStore.java | 269 ++++++++ .../state/internals/RocksDBWindowStore.java | 295 ++++++++ .../internals/RocksDBWindowStoreSupplier.java | 59 ++ .../state/internals/StoreChangeLogger.java | 92 +++ .../kstream/internals/KStreamAggregateTest.java | 36 +- .../kstream/internals/KTableAggregateTest.java | 36 +- .../internals/ProcessorStateManagerTest.java | 2 +- .../processor/internals/StandbyTaskTest.java | 2 +- .../state/AbstractKeyValueStoreTest.java | 191 ------ .../state/InMemoryKeyValueStoreTest.java | 48 -- .../state/InMemoryLRUCacheStoreTest.java | 156 ----- .../streams/state/KeyValueStoreTestDriver.java | 4 +- .../streams/state/RocksDBKeyValueStoreTest.java | 50 -- .../streams/state/RocksDBWindowStoreTest.java | 671 ------------------ .../internals/AbstractKeyValueStoreTest.java | 195 ++++++ .../internals/InMemoryKeyValueStoreTest.java | 50 ++ .../internals/InMemoryLRUCacheStoreTest.java | 159 +++++ .../internals/RocksDBKeyValueStoreTest.java | 52 ++ .../state/internals/RocksDBWindowStoreTest.java | 676 +++++++++++++++++++ 55 files changed, 3468 insertions(+), 3431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java deleted file mode 100644 index 45ff58e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.KafkaStreaming; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.KeyValueToLongMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Windowed; - -import java.util.Properties; - -public class KTableJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable"); - props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamingConfig config = new StreamingConfig(props); - - Serializer<String> stringSerializer = new StringSerializer(); - Deserializer<String> stringDeserializer = new StringDeserializer(); - - KStreamBuilder builder = new KStreamBuilder(); - - // stream aggregate - KStream<String, Long> stream1 = builder.stream("topic1"); - - @SuppressWarnings("unchecked") - KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() { - @Override - public long apply(String key, Long value) { - return value; - } - }, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer); - - // table aggregation - KTable<String, String> table1 = builder.table("topic2"); - - KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() { - @Override - public String apply(String key, String value) { - return value; - } - }, new KeyValueToLongMapper<String, String>() { - @Override - public long apply(String key, String value) { - return Long.parseLong(value); - } - }, stringSerializer, stringDeserializer, "table2"); - - // stream-table join - KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() { - @Override - public Long apply(Long value1, Long value2) { - if (value2 == null) - return 0L; - else - return value1 * value2; - } - }); - - // table-table join - KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() { - @Override - public String apply(String value1, Long value2) { - if (value2 == null) - return value1 + "-null"; - else if (value1 == null) - return "null-" + value2; - else - return value1 + "-" + value2; - } - }); - - wtable1.to("topic3"); - - KafkaStreaming kstream = new KafkaStreaming(builder, config); - kstream.start(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java index d715fbd..c601024 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java @@ -21,7 +21,7 @@ public interface Aggregator<K, V, T> { /** * Set the initial aggregate value */ - T initialValue(); + T initialValue(K aggKey); /** * When a new record with the aggregate key is added, @@ -34,9 +34,4 @@ public interface Aggregator<K, V, T> { * updating the aggregate value for this key */ T remove(K aggKey, V value, T aggregate); - - /** - * Merge two aggregate values - */ - T merge(T aggr1, T aggr2); } http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java deleted file mode 100644 index 6ed9125..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface AggregatorSupplier<K, V, T> { - - Aggregator<K, V, T> get(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java new file mode 100644 index 0000000..3c1ed46 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public class Count<K> implements Aggregator<K, Long, Long> { + + @Override + public Long initialValue(K aggKey) { + return 0L; + } + + @Override + public Long add(K aggKey, Long value, Long aggregate) { + return aggregate + 1L; + } + + @Override + public Long remove(K aggKey, Long value, Long aggregate) { + return aggregate - 1L; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 36741a8..dfed661 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorSupplier; + /** * KStream is an abstraction of a stream of key-value pairs. * @@ -268,35 +269,28 @@ public interface KStream<K, V> { /** * Aggregate values of this stream by key on a window basis. * - * @param aggregatorSupplier the class of aggregatorSupplier + * @param reducer the class of Reducer + * @param windows the specification of the aggregation window + */ + <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, + Windows<W> windows, + Serializer<K> keySerializer, + Serializer<V> aggValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> aggValueDeserializer); + + /** + * Aggregate values of this stream by key on a window basis. + * + * @param aggregator the class of Aggregator * @param windows the specification of the aggregation window * @param <T> the value type of the aggregated table */ - <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier, + <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator, Windows<W> windows, Serializer<K> keySerializer, Serializer<T> aggValueSerializer, Deserializer<K> keyDeserializer, Deserializer<T> aggValueDeserializer); - /** - * Sum extracted long integer values of this stream by key on a window basis. - * - * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value - * @param windows the specification of the aggregation window - */ - <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer); - - /** - * Count number of records of this stream by key on a window basis. - * - * @param windows the specification of the aggregation window - */ - <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer); - } http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 93eceec..87298d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -139,49 +139,42 @@ public interface KTable<K, V> { <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner); /** - * Aggregate values of this table by the selected key. + * Reduce values of this table by the selected key. * - * @param aggregatorSupplier the class of AggregatorSupplier + * @param addReducer the class of Reducer + * @param removeReducer the class of Reducer * @param selector the KeyValue mapper that select the aggregate key * @param name the name of the resulted table * @param <K1> the key type of the aggregated table * @param <V1> the value type of the aggregated table * @return the instance of KTable */ - <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K1> keySerializer, - Serializer<V1> valueSerializer, - Serializer<T> aggValueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valueDeserializer, - Deserializer<T> aggValueDeserializer, - String name); - - /** - * Sum extracted long integer values of this table by the selected aggregation key - * - * @param keySelector the class of KeyValueMapper to select the aggregation key - * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value - * @param name the name of the resulted table - */ - <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector, - KeyValueToLongMapper<K, V> valueSelector, - Serializer<K1> keySerializer, - Deserializer<K1> keyDeserializer, - String name); + <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, + Reducer<V1> removeReducer, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, + String name); /** - * Count number of records of this table by the selected aggregation key + * Aggregate values of this table by the selected key. * - * @param keySelector the class of KeyValueMapper to select the aggregation key + * @param aggregator the class of Aggregator + * @param selector the KeyValue mapper that select the aggregate key * @param name the name of the resulted table + * @param <K1> the key type of the aggregated table + * @param <V1> the value type of the aggregated table + * @return the instance of KTable */ - <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector, - Serializer<K1> keySerializer, - Serializer<V> valueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V> valueDeserializer, - String name); - + <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, + Serializer<T> aggValueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, + Deserializer<T> aggValueDeserializer, + String name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java new file mode 100644 index 0000000..418f442 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface Reducer<V> { + + V apply(V value1, V value2); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java new file mode 100644 index 0000000..1f8df04 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public class SumAsLong<K> implements Aggregator<K, Long, Long> { + + @Override + public Long initialValue(K aggKey) { + return 0L; + } + + @Override + public Long add(K aggKey, Long value, Long aggregate) { + return aggregate + value; + } + + @Override + public Long remove(K aggKey, Long value, Long aggregate) { + return aggregate - value; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java deleted file mode 100644 index b7dc5aa..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.AggregatorSupplier; - -public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> { - - private class Count implements Aggregator<K, V, Long> { - - @Override - public Long initialValue() { - return 0L; - } - - @Override - public Long add(K aggKey, V value, Long aggregate) { - return aggregate + 1; - } - - @Override - public Long remove(K aggKey, V value, Long aggregate) { - return aggregate - 1; - } - - @Override - public Long merge(Long aggr1, Long aggr2) { - return aggr1 + aggr2; - } - } - - @Override - public Aggregator<K, V, Long> get() { - return new Count(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 5745a03..91bfa9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -97,7 +97,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces T oldAgg = entry.value; if (oldAgg == null) - oldAgg = aggregator.initialValue(); + oldAgg = aggregator.initialValue(key); // try to add the new new value (there will never be old value) T newAgg = aggregator.add(key, value, oldAgg); @@ -119,7 +119,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { - T oldAgg = aggregator.initialValue(); + T oldAgg = aggregator.initialValue(key); T newAgg = aggregator.add(key, value, oldAgg); windowStore.put(key, newAgg, windowStartMs); http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 691910b..ce89220 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -18,15 +18,13 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.AggregatorSupplier; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueToLongMapper; +import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; @@ -39,7 +37,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.Serdes; import java.lang.reflect.Array; @@ -48,47 +46,50 @@ import java.util.Set; public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> { - private static final String FILTER_NAME = "KSTREAM-FILTER-"; + private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-"; - private static final String MAP_NAME = "KSTREAM-MAP-"; + private static final String BRANCH_NAME = "KSTREAM-BRANCH-"; - private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-"; + private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-"; + + private static final String FILTER_NAME = "KSTREAM-FILTER-"; private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-"; private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-"; - private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-"; + public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-"; - private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-"; + public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-"; - private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-"; + public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-"; - private static final String BRANCH_NAME = "KSTREAM-BRANCH-"; + private static final String MAP_NAME = "KSTREAM-MAP-"; - private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-"; + private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-"; - private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-"; + public static final String MERGE_NAME = "KSTREAM-MERGE-"; - private static final String SELECT_NAME = "KSTREAM-SELECT-"; + public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-"; - private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-"; + public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-"; - public static final String SINK_NAME = "KSTREAM-SINK-"; + private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-"; - public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-"; + private static final String REDUCE_NAME = "KSTREAM-REDUCE-"; - public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-"; + private static final String SELECT_NAME = "KSTREAM-SELECT-"; - public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-"; + public static final String SINK_NAME = "KSTREAM-SINK-"; - public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-"; + public static final String SOURCE_NAME = "KSTREAM-SOURCE-"; - public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-"; + private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-"; - public static final String MERGE_NAME = "KSTREAM-MERGE-"; + private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-"; + + private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-"; - public static final String SOURCE_NAME = "KSTREAM-SOURCE-"; public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) { super(topology, name, sourceNodes); @@ -394,7 +395,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier, + public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, + Windows<W> windows, + Serializer<K> keySerializer, + Serializer<V> aggValueSerializer, + Deserializer<K> keyDeserializer, + Deserializer<V> aggValueDeserializer) { + + // TODO: this agg window operator is only used for casting K to Windowed<K> for + // KTableProcessorSupplier, which is a bit awkward and better be removed in the future + String reduceName = topology.newName(REDUCE_NAME); + String selectName = topology.newName(SELECT_NAME); + + ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>(); + ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer); + + RocksDBWindowStoreSupplier<K, V> aggregateStore = + new RocksDBWindowStoreSupplier<>( + windows.name(), + windows.maintainMs(), + windows.segments, + false, + new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer), + null); + + // aggregate the values with the aggregator and local store + topology.addProcessor(selectName, aggWindowSupplier, this.name); + topology.addProcessor(reduceName, aggregateSupplier, selectName); + topology.addStateStore(aggregateStore, reduceName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes); + } + + @Override + public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator, Windows<W> windows, Serializer<K> keySerializer, Serializer<T> aggValueSerializer, @@ -407,7 +442,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V String selectName = topology.newName(SELECT_NAME); ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>(); - ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get()); + ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator); RocksDBWindowStoreSupplier<K, T> aggregateStore = new RocksDBWindowStoreSupplier<>( @@ -426,39 +461,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes); } - - @Override - public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector, - Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer) { - - KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() { - @Override - public KeyValue<K, Long> apply(K key, V value) { - return new KeyValue<>(key, valueSelector.apply(key, value)); - } - }); - - return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(), - windows, - keySerializer, - new LongSerializer(), - keyDeserializer, - new LongDeserializer()); - } - - - @Override - public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer) { - - return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(), - windows, - keySerializer, - new LongSerializer(), - keyDeserializer, - new LongDeserializer()); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java new file mode 100644 index 0000000..7d6eb27 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.Iterator; +import java.util.Map; + +public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, V> { + + private final String storeName; + private final Windows<W> windows; + private final Reducer<V> reducer; + + private boolean sendOldValues = false; + + public KStreamReduce(Windows<W> windows, String storeName, Reducer<V> reducer) { + this.windows = windows; + this.storeName = storeName; + this.reducer = reducer; + } + + @Override + public Processor<Windowed<K>, Change<V>> get() { + return new KStreamAggregateProcessor(); + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> { + + private WindowStore<K, V> windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + windowStore = (WindowStore<K, V>) context.getStateStore(storeName); + } + + @Override + public void process(Windowed<K> windowedKey, Change<V> change) { + // first get the matching windows + long timestamp = windowedKey.window().start(); + K key = windowedKey.value(); + V value = change.newValue; + + Map<Long, W> matchedWindows = windows.windowsFor(timestamp); + + long timeFrom = Long.MAX_VALUE; + long timeTo = Long.MIN_VALUE; + + // use range query on window store for efficient reads + for (long windowStartMs : matchedWindows.keySet()) { + timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; + timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; + } + + WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo); + + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue<Long, V> entry = iter.next(); + W window = matchedWindows.get(entry.key); + + if (window != null) { + + V oldAgg = entry.value; + V newAgg = oldAgg; + + // try to add the new new value (there will never be old value) + if (newAgg == null) { + newAgg = value; + } else { + newAgg = reducer.apply(newAgg, value); + } + + // update the store with the new value + windowStore.put(key, newAgg, window.start()); + + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + + matchedWindows.remove(entry.key); + } + } + + iter.close(); + + // create the new window for the rest of unmatched window that do not exist yet + for (long windowStartMs : matchedWindows.keySet()) { + windowStore.put(key, value, windowStartMs); + + // send the new aggregate pair (there will be no old value) + context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null)); + } + } + } + + @Override + public KTableValueGetterSupplier<Windowed<K>, V> view() { + + return new KTableValueGetterSupplier<Windowed<K>, V>() { + + public KTableValueGetter<Windowed<K>, V> get() { + return new KStreamAggregateValueGetter(); + } + + }; + } + + private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> { + + private WindowStore<K, V> windowStore; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + windowStore = (WindowStore<K, V>) context.getStateStore(storeName); + } + + @SuppressWarnings("unchecked") + @Override + public V get(Windowed<K> windowedKey) { + K key = windowedKey.value(); + W window = (W) windowedKey.window(); + + // this iterator should only contain one element + Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start()); + + return iter.next().value; + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index a5948f8..1730a8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -30,7 +30,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T private boolean sendOldValues = false; - KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) { + public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) { this.storeName = storeName; this.aggregator = aggregator; } @@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T T oldAgg = store.get(key); if (oldAgg == null) - oldAgg = aggregator.initialValue(); + oldAgg = aggregator.initialValue(key); T newAgg = oldAgg; http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 9888dff..8ee557c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -18,17 +18,15 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.AggregatorSupplier; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.KeyValueToLongMapper; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -48,31 +46,34 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private static final String REPARTITION_TOPIC_SUFFIX = "-repartition"; - private static final String FILTER_NAME = "KTABLE-FILTER-"; + private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-"; - private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-"; + private static final String FILTER_NAME = "KTABLE-FILTER-"; - private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; + public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-"; - private static final String SELECT_NAME = "KTABLE-SELECT-"; + public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-"; - private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-"; + public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-"; - public static final String SOURCE_NAME = "KTABLE-SOURCE-"; + public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-"; - public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-"; + private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-"; - public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-"; + public static final String MERGE_NAME = "KTABLE-MERGE-"; public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-"; public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-"; - public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-"; + private static final String REDUCE_NAME = "KTABLE-REDUCE-"; - public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-"; + private static final String SELECT_NAME = "KTABLE-SELECT-"; + + public static final String SOURCE_NAME = "KTABLE-SOURCE-"; + + private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; - public static final String MERGE_NAME = "KTABLE-MERGE-"; public final ProcessorSupplier<K, ?> processorSupplier; @@ -245,15 +246,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier, - KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K1> keySerializer, - Serializer<V1> valueSerializer, - Serializer<V2> aggValueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valueDeserializer, - Deserializer<V2> aggValueDeserializer, - String name) { + public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, + Serializer<T> aggValueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, + Deserializer<T> aggValueDeserializer, + String name) { String selectName = topology.newName(SELECT_NAME); String sinkName = topology.newName(KStreamImpl.SINK_NAME); @@ -267,7 +268,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get()); + ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator); StateStoreSupplier aggregateStore = Stores.create(name) .withKeys(keySerializer, keyDeserializer) @@ -295,55 +296,52 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } @Override - public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector, - final KeyValueToLongMapper<K, V> valueSelector, - Serializer<K1> keySerializer, - Deserializer<K1> keyDeserializer, - String name) { + public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, + Reducer<V1> removeReducer, + KeyValueMapper<K, V, KeyValue<K1, V1>> selector, + Serializer<K1> keySerializer, + Serializer<V1> valueSerializer, + Deserializer<K1> keyDeserializer, + Deserializer<V1> valueDeserializer, + String name) { - Serializer<Long> longSerializer = new LongSerializer(); - Deserializer<Long> longDeserializer = new LongDeserializer(); + String selectName = topology.newName(SELECT_NAME); + String sinkName = topology.newName(KStreamImpl.SINK_NAME); + String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); + String reduceName = topology.newName(REDUCE_NAME); - KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() { - @Override - public KeyValue<K1, Long> apply(K key, V value) { - K1 aggKey = keySelector.apply(key, value); - Long aggValue = valueSelector.apply(key, value); + String topic = name + REPARTITION_TOPIC_SUFFIX; - return new KeyValue<>(aggKey, aggValue); - } - }; + ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer); + ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); - return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper, - keySerializer, longSerializer, longSerializer, - keyDeserializer, longDeserializer, longDeserializer, - name); - } + KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - @Override - public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector, - Serializer<K1> keySerializer, - Serializer<V> valueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V> valueDeserializer, - String name) { + ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer); - Serializer<Long> longSerializer = new LongSerializer(); - Deserializer<Long> longDeserializer = new LongDeserializer(); + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerializer, keyDeserializer) + .withValues(valueSerializer, valueDeserializer) + .localDatabase() + .build(); - KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() { - @Override - public KeyValue<K1, V> apply(K key, V value) { - K1 aggKey = keySelector.apply(key, value); + // select the aggregate key and values (old and new), it would require parent to send old values + topology.addProcessor(selectName, selectSupplier, this.name); + this.enableSendingOldValues(); - return new KeyValue<>(aggKey, value); - } - }; + // send the aggregate key-value pairs to the intermediate topic for partitioning + topology.addInternalTopic(topic); + topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName); - return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper, - keySerializer, valueSerializer, longSerializer, - keyDeserializer, valueDeserializer, longDeserializer, - name); + // read the intermediate topic + topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); + + // aggregate the values with the aggregator and local store + topology.addProcessor(reduceName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, reduceName); + + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java new file mode 100644 index 0000000..0d1b55a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; + +public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { + + private final String storeName; + private final Reducer<V> addReducer; + private final Reducer<V> removeReducer; + + private boolean sendOldValues = false; + + public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> removeReducer) { + this.storeName = storeName; + this.addReducer = addReducer; + this.removeReducer = removeReducer; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + @Override + public Processor<K, Change<V>> get() { + return new KTableAggregateProcessor(); + } + + private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> { + + private KeyValueStore<K, V> store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + super.init(context); + + store = (KeyValueStore<K, V>) context.getStateStore(storeName); + } + + @Override + public void process(K key, Change<V> value) { + V oldAgg = store.get(key); + V newAgg = oldAgg; + + // first try to add the new new value + if (value.newValue != null) { + if (newAgg == null) { + newAgg = value.newValue; + } else { + newAgg = addReducer.apply(newAgg, value.newValue); + } + } + + // then try to remove the old value + if (value.oldValue != null) { + newAgg = removeReducer.apply(newAgg, value.oldValue); + } + + // update the store with the new value + store.put(key, newAgg); + + // send the old / new pair + if (sendOldValues) + context().forward(key, new Change<>(newAgg, oldAgg)); + else + context().forward(key, new Change<>(newAgg, null)); + } + } + + @Override + public KTableValueGetterSupplier<K, V> view() { + + return new KTableValueGetterSupplier<K, V>() { + + public KTableValueGetter<K, V> get() { + return new KTableAggregateValueGetter(); + } + + }; + } + + private class KTableAggregateValueGetter implements KTableValueGetter<K, V> { + + private KeyValueStore<K, V> store; + + @SuppressWarnings("unchecked") + @Override + public void init(ProcessorContext context) { + store = (KeyValueStore<K, V>) context.getStateStore(storeName); + } + + @Override + public V get(K key) { + return store.get(key); + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java index d07fc5d..c993512 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.MeteredKeyValueStore; -import org.apache.kafka.streams.state.RocksDBStore; +import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; +import org.apache.kafka.streams.state.internals.RocksDBStore; import org.apache.kafka.streams.state.Serdes; /** http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java deleted file mode 100644 index b66590e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.AggregatorSupplier; - -public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> { - - private class LongSum implements Aggregator<K, Long, Long> { - - @Override - public Long initialValue() { - return 0L; - } - - @Override - public Long add(K aggKey, Long value, Long aggregate) { - return aggregate + value; - } - - @Override - public Long remove(K aggKey, Long value, Long aggregate) { - return aggregate - value; - } - - @Override - public Long merge(Long aggr1, Long aggr2) { - return aggr1 + aggr2; - } - } - - @Override - public Aggregator<K, Long, Long> get() { - return new LongSum(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java deleted file mode 100644 index 00f4b55..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.AggregatorSupplier; -import org.apache.kafka.streams.kstream.Aggregator; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; - -/** - * NOTE: This is just a demo aggregate supplier that can be implemented by users to add their own built-in aggregates. - * It is highly in-efficient and is not supposed to be merged in. - */ -public class TopKSupplier<K, V extends Comparable<V>> implements AggregatorSupplier<K, V, Collection<V>> { - - private final int k; - - public TopKSupplier(int k) { - this.k = k; - } - - private class TopK implements Aggregator<K, V, Collection<V>> { - - private final Map<K, PriorityQueue<V>> sorted = new HashMap<>(); - - @Override - public Collection<V> initialValue() { - return Collections.<V>emptySet(); - } - - @Override - public Collection<V> add(K aggKey, V value, Collection<V> aggregate) { - PriorityQueue<V> queue = sorted.get(aggKey); - if (queue == null) { - queue = new PriorityQueue<>(); - sorted.put(aggKey, queue); - } - - queue.add(value); - - PriorityQueue<V> copy = new PriorityQueue<>(queue); - - Set<V> ret = new HashSet<>(); - for (int i = 1; i <= k; i++) - ret.add(copy.poll()); - - return ret; - } - - @Override - public Collection<V> remove(K aggKey, V value, Collection<V> aggregate) { - PriorityQueue<V> queue = sorted.get(aggKey); - - if (queue == null) - throw new IllegalStateException("This should not happen."); - - queue.remove(value); - - PriorityQueue<V> copy = new PriorityQueue<>(queue); - - Set<V> ret = new HashSet<>(); - for (int i = 1; i <= k; i++) - ret.add(copy.poll()); - - return ret; - } - - @Override - public Collection<V> merge(Collection<V> aggr1, Collection<V> aggr2) { - PriorityQueue<V> copy = new PriorityQueue<>(aggr1); - copy.addAll(aggr2); - - Set<V> ret = new HashSet<>(); - for (int i = 1; i <= k; i++) - ret.add(copy.poll()); - - return ret; - } - } - - @Override - public Aggregator<K, V, Collection<V>> get() { - return new TopK(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3cac3f1..547bb15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.OffsetCheckpoint; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java deleted file mode 100644 index d1f845c..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -/** - * An in-memory key-value store based on a TreeMap. - * - * @param <K> The key type - * @param <V> The value type - * - * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig) - */ -public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { - - private final String name; - private final Serdes serdes; - private final Time time; - - protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) { - this.name = name; - this.serdes = serdes; - this.time = time; - } - - public String name() { - return name; - } - - public StateStore get() { - return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time); - } - - private static class MemoryStore<K, V> implements KeyValueStore<K, V> { - - private final String name; - private final NavigableMap<K, V> map; - - public MemoryStore(String name) { - super(); - this.name = name; - this.map = new TreeMap<>(); - } - - @Override - public String name() { - return this.name; - } - - @Override - public void init(ProcessorContext context) { - // do-nothing since it is in-memory - } - - @Override - public boolean persistent() { - return false; - } - - @Override - public V get(K key) { - return this.map.get(key); - } - - @Override - public void put(K key, V value) { - this.map.put(key, value); - } - - @Override - public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry : entries) - put(entry.key(), entry.value()); - } - - @Override - public V delete(K key) { - return this.map.remove(key); - } - - @Override - public KeyValueIterator<K, V> range(K from, K to) { - return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator()); - } - - @Override - public KeyValueIterator<K, V> all() { - return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator()); - } - - @Override - public void flush() { - // do-nothing since it is in-memory - } - - @Override - public void close() { - // do-nothing - } - - private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> { - private final Iterator<Map.Entry<K, V>> iter; - - public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) { - this.iter = iter; - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public Entry<K, V> next() { - Map.Entry<K, V> entry = iter.next(); - return new Entry<>(entry.getKey(), entry.getValue()); - } - - @Override - public void remove() { - iter.remove(); - } - - @Override - public void close() { - } - - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java deleted file mode 100644 index a346534..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; - -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.TreeSet; - -/** - * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries. - * - * @param <K> The key type - * @param <V> The value type - * - */ -public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { - - private final String name; - private final int capacity; - private final Serdes serdes; - private final Time time; - - protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) { - this.name = name; - this.capacity = capacity; - this.serdes = serdes; - this.time = time; - } - - public String name() { - return name; - } - - public StateStore get() { - MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity); - final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time); - cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() { - @Override - public void apply(K key, V value) { - store.removed(key); - } - }); - return store; - } - - private static interface EldestEntryRemovalListener<K, V> { - public void apply(K key, V value); - } - - protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { - - private final String name; - private final Map<K, V> map; - private final NavigableSet<K> keys; - private EldestEntryRemovalListener<K, V> listener; - - public MemoryLRUCache(String name, final int maxCacheSize) { - this.name = name; - this.keys = new TreeSet<>(); - // leave room for one extra entry to handle adding an entry before the oldest can be removed - this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { - if (size() > maxCacheSize) { - K key = eldest.getKey(); - keys.remove(key); - if (listener != null) listener.apply(key, eldest.getValue()); - return true; - } - return false; - } - }; - } - - protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) { - this.listener = listener; - } - - @Override - public String name() { - return this.name; - } - - @Override - public void init(ProcessorContext context) { - // do-nothing since it is in-memory - } - - @Override - public boolean persistent() { - return false; - } - - @Override - public V get(K key) { - return this.map.get(key); - } - - @Override - public void put(K key, V value) { - this.map.put(key, value); - this.keys.add(key); - } - - @Override - public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry : entries) - put(entry.key(), entry.value()); - } - - @Override - public V delete(K key) { - V value = this.map.remove(key); - this.keys.remove(key); - return value; - } - - @Override - public KeyValueIterator<K, V> range(K from, K to) { - return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map); - } - - @Override - public KeyValueIterator<K, V> all() { - return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map); - } - - @Override - public void flush() { - // do-nothing since it is in-memory - } - - @Override - public void close() { - // do-nothing - } - - private static class CacheIterator<K, V> implements KeyValueIterator<K, V> { - private final Iterator<K> keys; - private final Map<K, V> entries; - private K lastKey; - - public CacheIterator(Iterator<K> keys, Map<K, V> entries) { - this.keys = keys; - this.entries = entries; - } - - @Override - public boolean hasNext() { - return keys.hasNext(); - } - - @Override - public Entry<K, V> next() { - lastKey = keys.next(); - return new Entry<>(lastKey, entries.get(lastKey)); - } - - @Override - public void remove() { - keys.remove(); - entries.remove(lastKey); - } - - @Override - public void close() { - // do nothing - } - } - } -}
