Repository: kafka Updated Branches: refs/heads/trunk 08063f50a -> 8bd2a68b5
KAFKA-5655; materialized count, aggregate, reduce to KGroupedTable Add overloads of `count`, `aggregate`, `reduce` using `Materialized` to `KGroupedTable` deprecate other overloads Author: Damian Guy <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]> Closes #3829 from dguy/kafka-5655 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8bd2a68b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8bd2a68b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8bd2a68b Branch: refs/heads/trunk Commit: 8bd2a68b5020f0bf8f79cbe59676d649eebf170f Parents: 08063f5 Author: Damian Guy <[email protected]> Authored: Tue Sep 12 17:20:43 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Tue Sep 12 17:20:43 2017 +0100 ---------------------------------------------------------------------- .../kafka/streams/kstream/KGroupedTable.java | 204 +++++++++++++++++++ .../kafka/streams/kstream/Materialized.java | 12 +- .../kstream/internals/KGroupedTableImpl.java | 134 +++++++++--- .../kafka/streams/kstream/MaterializedTest.java | 54 +++++ .../internals/KGroupedTableImplTest.java | 137 ++++++++++++- .../kstream/internals/KTableAggregateTest.java | 1 + 6 files changed, 509 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index bf0df55..f854320 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; @@ -80,7 +81,9 @@ public interface KGroupedTable<K, V> { * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}. * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that * represent the latest (rolling) count (i.e., number of records) for each key + * @deprecated use {@link #count(Materialized)} */ + @Deprecated KTable<K, Long> count(final String queryableStoreName); /** @@ -98,6 +101,47 @@ public interface KGroupedTable<K, V> { * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // counting words + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-word"; + * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, + * '.', '_' and '-'. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null} + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized); + + /** + * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to + * the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter @@ -148,7 +192,9 @@ public interface KGroupedTable<K, V> { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that * represent the latest (rolling) count (i.e., number of records) for each key + * @deprecated use {@link #count(Materialized)} */ + @Deprecated KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier); /** @@ -218,7 +264,9 @@ public interface KGroupedTable<K, V> { * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)} */ + @Deprecated KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final String queryableStoreName); @@ -228,6 +276,76 @@ public interface KGroupedTable<K, V> { * mapped} to the same key into a new instance of {@link KTable}. * Records with {@code null} key are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the + * current aggregate (first argument) and the record's value (second argument) by adding the new record to the + * aggregate. + * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate (first argument) and the record's value (second + * argument) by "removing" the "replaced" record from the aggregate. + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum. + * For sum, the adder and substractor would work as follows: + * <pre>{@code + * public class SumAdder implements Reducer<Integer> { + * public Integer apply(Integer currentAgg, Integer newValue) { + * return currentAgg + newValue; + * } + * } + * + * public class SumSubtractor implements Reducer<Integer> { + * public Integer apply(Integer currentAgg, Integer oldValue) { + * return currentAgg - oldValue; + * } + * } + * }</pre> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // counting words + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-word"; + * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, + * '.', '_' and '-'. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param adder a {@link Reducer} that adds a new value to the aggregate result + * @param subtractor a {@link Reducer} that removed an old value from the aggregate result + * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable<K, V> reduce(final Reducer<V> adder, + final Reducer<V> subtractor, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); + /** + * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. @@ -344,7 +462,9 @@ public interface KGroupedTable<K, V> { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)} */ + @Deprecated KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -427,7 +547,9 @@ public interface KGroupedTable<K, V> { * @param <VR> the value type of the aggregated {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)} */ + @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, @@ -439,6 +561,86 @@ public interface KGroupedTable<K, V> { * Records with {@code null} key are ignored. * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it, * for example, allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given {@code storeSupplier}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the + * current aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value by adding the new record to the aggregate. + * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" + * record from the aggregate. + * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions + * like sum. + * For sum, the initializer, adder, and substractor would work as follows: + * <pre>{@code + * // in this example, LongSerde.class must be set as default value serde in StreamsConfig + * public class SumInitializer implements Initializer<Long> { + * public Long apply() { + * return 0L; + * } + * } + * + * public class SumAdder implements Aggregator<String, Integer, Long> { + * public Long apply(String key, Integer newValue, Long aggregate) { + * return aggregate + newValue; + * } + * } + * + * public class SumSubstractor implements Aggregator<String, Integer, Long> { + * public Long apply(String key, Integer oldValue, Long aggregate) { + * return aggregate - oldValue; + * } + * } + * }</pre> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // counting words + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-word"; + * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer an {@link Initializer} that provides an initial aggregate result value + * @param adder an {@link Aggregator} that adds a new record to the aggregate result + * @param subtractor an {@link Aggregator} that removed an old record from the aggregate result + * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null} + * @param <VR> the value type of the aggregated {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> adder, + final Aggregator<? super K, ? super V, VR> subtractor, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); + + /** + * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers. + * Records with {@code null} key are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it, + * for example, allows the result to have a different type than the input values. * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String) * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}. @@ -582,7 +784,9 @@ public interface KGroupedTable<K, V> { * @param <VR> the value type of the aggregated {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)} */ + @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index fb2e7a6..1f142c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; @@ -29,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStore; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * Used to describe how a {@link StateStore} should be materialized. @@ -70,13 +72,15 @@ public class Materialized<K, V, S extends StateStore> { /** * Materialize a {@link StateStore} with the given name. * - * @param storeName name of the store to materialize + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII + * alphanumerics, '.', '_' and '-'. * @param <K> key type of the store * @param <V> value type of the store * @param <S> type of the {@link StateStore} * @return a new {@link Materialized} instance with the given storeName */ public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) { + Topic.validate(storeName); return new Materialized<>(storeName); } @@ -89,6 +93,7 @@ public class Materialized<K, V, S extends StateStore> { * @return a new {@link Materialized} instance with the given supplier */ public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) { + Objects.requireNonNull(supplier, "supplier can't be null"); return new Materialized<>(supplier); } @@ -98,9 +103,11 @@ public class Materialized<K, V, S extends StateStore> { * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store * @param <K> key type of the store * @param <V> value type of the store - * @return a new {@link Materialized} instance with the given supplier + * @return a new {@link Materialized} instance with the given sup + * plier */ public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) { + Objects.requireNonNull(supplier, "supplier can't be null"); return new Materialized<>(supplier); } @@ -113,6 +120,7 @@ public class Materialized<K, V, S extends StateStore> { * @return a new {@link Materialized} instance with the given supplier */ public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier) { + Objects.requireNonNull(supplier, "supplier can't be null"); return new Materialized<>(supplier); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index aefaad8..e69d4f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Initializer; @@ -48,6 +50,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup protected final Serde<? extends K> keySerde; protected final Serde<? extends V> valSerde; private boolean isQueryable = true; + private final Initializer<Long> countInitializer = new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }; + + private final Aggregator<K, V, Long> countAdder = new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1L; + } + }; + + private Aggregator<K, V, Long> countSubtractor = new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate - 1L; + } + }; public KGroupedTableImpl(final InternalStreamsBuilder builder, final String name, @@ -116,19 +138,33 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, final StateStoreSupplier<KeyValueStore> storeSupplier) { - String sinkName = builder.newName(KStreamImpl.SINK_NAME); - String sourceName = builder.newName(KStreamImpl.SOURCE_NAME); - String funcName = builder.newName(functionName); + final String sinkName = builder.newName(KStreamImpl.SINK_NAME); + final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME); + final String funcName = builder.newName(functionName); + + buildAggregate(aggregateSupplier, + storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX, + funcName, + sourceName, + sinkName); + builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName); - String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX; + // return the KTable representation with the intermediate topic as the sources + return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable); + } - Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer(); - Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); - Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer(); - Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); + private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, + final String topic, + final String funcName, + final String sourceName, + final String sinkName) { + final Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer(); + final Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); + final Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer(); + final Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); - ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); - ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); + final ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); + final ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); // send the aggregate key-value pairs to the intermediate topic for partitioning builder.internalTopologyBuilder.addInternalTopic(topic); @@ -139,10 +175,23 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup // aggregate the values with the aggregator and local store builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName); - builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName); + } + + private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, + final String functionName, + final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) { + final String sinkName = builder.newName(KStreamImpl.SINK_NAME); + final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME); + final String funcName = builder.newName(functionName); + + buildAggregate(aggregateSupplier, + materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX, + funcName, + sourceName, sinkName); + builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), funcName); // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable); + return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable); } @Override @@ -155,6 +204,21 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @Override public KTable<K, V> reduce(final Reducer<V> adder, + final Reducer<V> subtractor, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(adder, "adder can't be null"); + Objects.requireNonNull(subtractor, "subtractor can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized); + final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(), + adder, + subtractor); + return doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal); + } + + @Override + public KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor) { return reduce(adder, subtractor, (String) null); } @@ -177,6 +241,32 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup } @Override + public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) { + return aggregate(countInitializer, + countAdder, + countSubtractor, + materialized); + } + + @Override + public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> adder, + final Aggregator<? super K, ? super V, VR> subtractor, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(initializer, "initializer can't be null"); + Objects.requireNonNull(adder, "adder can't be null"); + Objects.requireNonNull(subtractor, "subtractor can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(materialized); + final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(), + initializer, + adder, + subtractor); + return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal); + } + + @Override public KTable<K, Long> count() { return count((String) null); } @@ -184,23 +274,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @Override public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) { return this.aggregate( - new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<K, V, Long>() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate + 1L; - } - }, new Aggregator<K, V, Long>() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate - 1L; - } - }, + countInitializer, + countAdder, + countSubtractor, storeSupplier); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java new file mode 100644 index 0000000..de3e503 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.junit.Test; + +public class MaterializedTest { + + @Test + public void shouldAllowValidTopicNamesAsStoreName() { + Materialized.as("valid-name"); + Materialized.as("valid.name"); + Materialized.as("valid_name"); + } + + @Test(expected = InvalidTopicException.class) + public void shouldNotAllowInvalidTopicNames() { + Materialized.as("not:valid"); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfWindowBytesStoreSupplierIsNull() { + Materialized.as((WindowBytesStoreSupplier) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfKeyValueBytesStoreSupplierIsNull() { + Materialized.as((KeyValueBytesStoreSupplier) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfSessionBytesStoreSupplierIsNull() { + Materialized.as((SessionBytesStoreSupplier) null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index 105dd2e..ff9726e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -18,12 +18,15 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; @@ -39,10 +42,13 @@ import org.junit.Test; import java.util.HashMap; import java.util.Map; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +@SuppressWarnings("deprecation") public class KGroupedTableImplTest { private final StreamsBuilder builder = new StreamsBuilder(); @@ -50,6 +56,7 @@ public class KGroupedTableImplTest { private KGroupedTable<String, String> groupedTable; @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); + private final String topic = "input"; @Before public void before() { @@ -142,7 +149,6 @@ public class KGroupedTableImplTest { @Test public void shouldReduce() { - final String topic = "input"; final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection = new KeyValueMapper<String, Number, KeyValue<String, Integer>>() { @Override @@ -161,7 +167,6 @@ public class KGroupedTableImplTest { @Test public void shouldReduceWithInternalStoreName() { - final String topic = "input"; final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection = new KeyValueMapper<String, Number, KeyValue<String, Integer>>() { @Override @@ -177,4 +182,132 @@ public class KGroupedTableImplTest { doShouldReduce(reduced, topic); assertNull(reduced.queryableStoreName()); } + + @SuppressWarnings("unchecked") + @Test + public void shouldReduceAndMaterializeResults() { + final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection = + new KeyValueMapper<String, Number, KeyValue<String, Integer>>() { + @Override + public KeyValue<String, Integer> apply(String key, Number value) { + return KeyValue.pair(key, value.intValue()); + } + }; + + final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store") + .groupBy(intProjection) + .reduce(MockReducer.INTEGER_ADDER, + MockReducer.INTEGER_SUBTRACTOR, + Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduce") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Integer())); + + doShouldReduce(reduced, topic); + final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.allStateStores().get("reduce"); + assertThat(reduce.get("A"), equalTo(5)); + assertThat(reduce.get("B"), equalTo(6)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCountAndMaterializeResults() { + final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store"); + table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), + Serialized.with(Serdes.String(), + Serdes.String())) + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())); + + processData(topic); + final KeyValueStore<String, Long> counts = (KeyValueStore<String, Long>) driver.allStateStores().get("count"); + assertThat(counts.get("1"), equalTo(3L)); + assertThat(counts.get("2"), equalTo(2L)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAggregateAndMaterializeResults() { + final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store"); + table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), + Serialized.with(Serdes.String(), + Serdes.String())) + .aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + MockAggregator.TOSTRING_REMOVER, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate") + .withValueSerde(Serdes.String()) + .withKeySerde(Serdes.String())); + + processData(topic); + final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate"); + assertThat(aggregate.get("1"), equalTo("0+1+1+1")); + assertThat(aggregate.get("2"), equalTo("0+2+2")); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointOnCountWhenMaterializedIsNull() { + groupedTable.count((Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() { + groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (Materialized) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnReduceWhenAdderIsNull() { + groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnReduceWhenSubtractorIsNull() { + groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnAggregateWhenInitializerIsNull() { + groupedTable.aggregate(null, + MockAggregator.TOSTRING_ADDER, + MockAggregator.TOSTRING_REMOVER, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnAggregateWhenAdderIsNull() { + groupedTable.aggregate(MockInitializer.STRING_INIT, + null, + MockAggregator.TOSTRING_REMOVER, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnAggregateWhenSubtractorIsNull() { + groupedTable.aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + null, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() { + groupedTable.aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + MockAggregator.TOSTRING_REMOVER, + (Materialized) null); + } + + private void processData(final String topic) { + driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer()); + driver.setTime(0L); + driver.process(topic, "A", "1"); + driver.process(topic, "B", "1"); + driver.process(topic, "C", "1"); + driver.process(topic, "D", "2"); + driver.process(topic, "E", "2"); + driver.flushState(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 9347cc8..accbb9c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -405,4 +405,5 @@ public class KTableAggregateTest { driver.process("tableOne", "1", "5"); assertEquals(Long.valueOf(4L), reduceResults.get("2")); } + }
