KAFKA-5873; add materialized overloads to StreamsBuilder Add overloads for `table` and `globalTable` that use `Materialized`
Author: Damian Guy <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]> Closes #3837 from dguy/kafka-5873 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f2b74aa1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f2b74aa1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f2b74aa1 Branch: refs/heads/trunk Commit: f2b74aa1c36bf2882006c14f7cbd56b493f39d26 Parents: 52d7b67 Author: Damian Guy <[email protected]> Authored: Mon Sep 18 15:53:44 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Mon Sep 18 15:53:44 2017 +0100 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedDemo.java | 7 +- .../examples/pageview/PageViewUntypedDemo.java | 9 +- .../apache/kafka/streams/StreamsBuilder.java | 700 +++---------------- .../java/org/apache/kafka/streams/Topology.java | 2 +- .../internals/InternalStreamsBuilder.java | 152 ++-- .../streams/kstream/internals/KTableImpl.java | 9 +- .../kstream/internals/MaterializedInternal.java | 7 +- .../internals/InternalTopologyBuilder.java | 3 +- .../apache/kafka/streams/KafkaStreamsTest.java | 3 +- .../streams/integration/EosIntegrationTest.java | 12 +- .../GlobalKTableIntegrationTest.java | 13 +- .../integration/JoinIntegrationTest.java | 4 +- .../KStreamKTableJoinIntegrationTest.java | 3 +- .../KTableKTableJoinIntegrationTest.java | 6 +- .../integration/RestoreIntegrationTest.java | 3 +- .../internals/GlobalKTableJoinsTest.java | 5 +- .../internals/InternalStreamsBuilderTest.java | 87 ++- .../internals/KGroupedTableImplTest.java | 25 +- .../kstream/internals/KStreamImplTest.java | 11 +- .../internals/KStreamKStreamLeftJoinTest.java | 2 +- .../internals/KStreamKTableJoinTest.java | 7 +- .../internals/KStreamKTableLeftJoinTest.java | 5 +- .../kstream/internals/KTableAggregateTest.java | 22 +- .../kstream/internals/KTableFilterTest.java | 28 +- .../kstream/internals/KTableForeachTest.java | 12 +- .../kstream/internals/KTableImplTest.java | 29 +- .../kstream/internals/KTableKTableJoinTest.java | 22 +- .../internals/KTableKTableLeftJoinTest.java | 29 +- .../internals/KTableKTableOuterJoinTest.java | 14 +- .../kstream/internals/KTableMapKeysTest.java | 3 +- .../kstream/internals/KTableMapValuesTest.java | 16 +- .../kstream/internals/KTableSourceTest.java | 10 +- .../kafka/streams/perf/SimpleBenchmark.java | 13 +- .../kafka/streams/perf/YahooBenchmark.java | 3 +- .../internals/StreamsMetadataStateTest.java | 8 +- .../kafka/streams/tests/SmokeTestClient.java | 13 +- 36 files changed, 443 insertions(+), 854 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 72f9be8..068eece 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -145,8 +146,8 @@ public class PageViewTypedDemo { KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde)); - KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, - "streams-userprofile-input", "streams-userprofile-store-name"); + KTable<String, UserProfile> users = builder.table("streams-userprofile-input", + Consumed.with(Serdes.String(), userProfileSerde)); KStream<WindowedPageViewByRegion, RegionCount> regionCount = views .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { @@ -190,7 +191,7 @@ public class PageViewTypedDemo { }); // write to the result topic - regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output"); + regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index e8787af..c20c077 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -73,10 +74,10 @@ public class PageViewUntypedDemo { final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); - KStream<String, JsonNode> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), jsonSerde)); + final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde); + KStream<String, JsonNode> views = builder.stream("streams-pageview-input", consumed); - KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde, - "streams-userprofile-input", "streams-userprofile-store-name"); + KTable<String, JsonNode> users = builder.table("streams-userprofile-input", consumed); KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() { @Override @@ -121,7 +122,7 @@ public class PageViewUntypedDemo { }); // write to the result topic - regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output"); + regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde)); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index a26822a..a272ec4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -16,28 +16,30 @@ */ package org.apache.kafka.streams; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; +import org.apache.kafka.streams.state.StoreBuilder; import java.util.Collection; import java.util.Collections; +import java.util.Objects; import java.util.regex.Pattern; /** @@ -129,6 +131,8 @@ public class StreamsBuilder { */ public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics, final Consumed<K, V> consumed) { + Objects.requireNonNull(topics, "topics can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); return internalStreamsBuilder.stream(topics, new ConsumedInternal<>(consumed)); } @@ -170,6 +174,8 @@ public class StreamsBuilder { */ public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern, final Consumed<K, V> consumed) { + Objects.requireNonNull(topicPattern, "topicPattern can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); return internalStreamsBuilder.stream(topicPattern, new ConsumedInternal<>(consumed)); } @@ -182,11 +188,17 @@ public class StreamsBuilder { * Note that the specified input topic must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the given + * {@code Materialized} instance. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> + * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the + * serdes in {@link Materialized}, i.e., + * <pre> {@code + * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName)) + * } + * </pre> * To query the local {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code @@ -199,46 +211,20 @@ public class StreamsBuilder { * query the value of the key on a parallel running instance of your Kafka Streams application. * * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} this is the equivalent of {@link #table(String)} + * @param consumed the instance of {@link Consumed} used to define optional parameters; cannot be {@code null} + * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null} * @return a {@link KTable} for the specified topic */ public synchronized <K, V> KTable<K, V> table(final String topic, - final String queryableStoreName) { - return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the - * {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * Note that the specified input topics must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param topic the topic name; cannot be {@code null} - * @param storeSupplier user defined state store supplier; cannot be {@code null} - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - return internalStreamsBuilder.table(null, null, null, null, topic, storeSupplier); + final Consumed<K, V> consumed, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); + return internalStreamsBuilder.table(topic, + new ConsumedInternal<>(consumed), + new MaterializedInternal<>(materialized)); } /** @@ -259,7 +245,7 @@ public class StreamsBuilder { * @return a {@link KTable} for the specified topic */ public synchronized <K, V> KTable<K, V> table(final String topic) { - return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), null); + return table(topic, new ConsumedInternal<K, V>()); } /** @@ -277,450 +263,47 @@ public class StreamsBuilder { * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * * @param topic the topic name; cannot be {@code null} - * @param consumed the instance of {@link Consumed} used to define optional parameters + * @param consumed the instance of {@link Consumed} used to define optional parameters; cannot be {@code null} * @return a {@link KTable} for the specified topic */ public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed) { - return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), null); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * Note that the specified input topics must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed - * offsets are available - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} this is the equivalent of - * {@link #table(String, Consumed)} - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final String topic, - final String queryableStoreName) { - return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(offsetReset)), queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default {@link TimestampExtractor} and default key and value deserializers - * as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed - * offsets are available - * @param topic the topic name; cannot be {@code null} - * @param storeSupplier user defined state store supplier; cannot be {@code null} - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy and default key and value deserializers - * as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue} pairs with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, - * if not specified the default extractor defined in the configs will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor, - final String topic, - final String queryableStoreName) { - return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(timestampExtractor)), queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue} pairs with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed - * offsets are available - * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, - * if not specified the default extractor defined in the configs will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final TimestampExtractor timestampExtractor, - final String topic, - final String queryableStoreName) { - final Consumed<K, V> consumed = Consumed.<K, V>with(offsetReset).withTimestampExtractor(timestampExtractor); - return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} - * as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final String queryableStoreName) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); return internalStreamsBuilder.table(topic, - new ConsumedInternal<>(keySerde, valueSerde, null, null), - queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param storeSupplier user defined state store supplier; cannot be {@code null} - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier); + new ConsumedInternal<>(consumed), + new MaterializedInternal<>( + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as( + internalStreamsBuilder.newStoreName(topic)) + .withKeySerde(consumed.keySerde) + .withValueSerde(consumed.valueSerde), + false)); } /** * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. * Input {@link KeyValue records} with {@code null} key will be dropped. * <p> * Note that the specified input topics must be partitioned by key. * If this is not the case the returned {@link KTable} will be corrupted. * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid - * committed offsets are available - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final String queryableStoreName) { - final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, null, offsetReset); - return internalStreamsBuilder.table(topic, consumed, queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. - * Input {@link KeyValue} pairs with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, - * if not specified the default extractor defined in the configs will be used - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor, - final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final String queryableStoreName) { - final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null); - return internalStreamsBuilder.table(topic, consumed, queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * Input {@link KeyValue} pairs with {@code null} key will be dropped. - * <p> - * Note that the specified input topic must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code storeName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the {@link Materialized} instance. + * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid - * committed offsets are available - * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, - * if not specified the default extractor defined in the configs will be used - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given + * @param topic the topic name; cannot be {@code null} + * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null} * @return a {@link KTable} for the specified topic */ - public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final TimestampExtractor timestampExtractor, - final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final String queryableStoreName) { + public synchronized <K, V> KTable<K, V> table(final String topic, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); return internalStreamsBuilder.table(topic, - new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, offsetReset), - queryableStoreName); - } - - /** - * Create a {@link KTable} for the specified topic. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * Note that the specified input topics must be partitioned by key. - * If this is not the case the returned {@link KTable} will be corrupted. - * <p> - * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) - * }</pre> - * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to - * query the value of the key on a parallel running instance of your Kafka Streams application. - * - * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed - * offsets are available - * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, - * if not specified the default extractor defined in the configs will be used - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param storeSupplier user defined state store supplier; cannot be {@code null} - * @return a {@link KTable} for the specified topic - */ - public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final TimestampExtractor timestampExtractor, - final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, storeSupplier); - } - - /** - * Create a {@link GlobalKTable} for the specified topic. - * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. - * Input {@link KeyValue records} with {@code null} key will be dropped. - * <p> - * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); - * }</pre> - * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} - * regardless of the specified value in {@link StreamsConfig}. - * - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link GlobalKTable} for the specified topic - */ - public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, - final String queryableStoreName) { - return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), queryableStoreName); + new ConsumedInternal<K, V>(), + new MaterializedInternal<>(materialized)); } /** @@ -741,7 +324,15 @@ public class StreamsBuilder { */ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed) { - return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), null); + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized = + new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as( + internalStreamsBuilder.newStoreName(topic)) + .withKeySerde(consumed.keySerde) + .withValueSerde(consumed.valueSerde), + false); + return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized); } /** @@ -761,61 +352,25 @@ public class StreamsBuilder { * @return a {@link GlobalKTable} for the specified topic */ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) { - return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), null); + return globalTable(topic, Consumed.<K, V>with(null, null)); } /** * Create a {@link GlobalKTable} for the specified topic. - * The default {@link TimestampExtractor} and default key and value deserializers as specified in - * the {@link StreamsConfig config} are used. - * Input {@link KeyValue} pairs with {@code null} key will be dropped. - * <p> - * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. - * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. - * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). - * <p> - * To query the local {@link KeyValueStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: - * <pre>{@code - * KafkaStreams streams = ... - * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); - * String key = "some-key"; - * Long valueForKey = localStore.get(key); - * }</pre> - * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} - * regardless of the specified value in {@link StreamsConfig}. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable}, - * if not specified the default extractor defined in the configs will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given - * @return a {@link GlobalKTable} for the specified topic - */ - public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, - final Serde<V> valueSerde, - final TimestampExtractor timestampExtractor, - final String topic, - final String queryableStoreName) { - return internalStreamsBuilder.globalTable(topic, - new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null), - queryableStoreName); - } - - /** - * Create a {@link GlobalKTable} for the specified topic. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * Input {@link KeyValue} pairs with {@code null} key will be dropped. * <p> - * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with + * the provided instance of {@link Materialized}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> + * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the + * serdes in {@link Materialized}, i.e., + * <pre> {@code + * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName)) + * } + * </pre> * To query the local {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * <pre>{@code @@ -827,28 +382,31 @@ public class StreamsBuilder { * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} * regardless of the specified value in {@link StreamsConfig}. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be {@code null} - * @param storeSupplier user defined state store supplier; Cannot be {@code null} + * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null} + * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null} * @return a {@link GlobalKTable} for the specified topic */ - public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - return internalStreamsBuilder.globalTable(keySerde, valueSerde, topic, storeSupplier); + public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, + final Consumed<K, V> consumed, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + // always use the serdes from consumed + materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); + return internalStreamsBuilder.globalTable(topic, + new ConsumedInternal<>(consumed), + new MaterializedInternal<>(materialized)); } /** * Create a {@link GlobalKTable} for the specified topic. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * * Input {@link KeyValue} pairs with {@code null} key will be dropped. * <p> - * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given - * {@code queryableStoreName}. + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with + * the provided instance of {@link Materialized}. * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * <p> @@ -863,34 +421,30 @@ public class StreamsBuilder { * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} * regardless of the specified value in {@link StreamsConfig}. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valueSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be {@code null} - * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given + * @param topic the topic name; cannot be {@code null} + * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null} * @return a {@link GlobalKTable} for the specified topic */ - public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, - final Serde<V> valueSerde, - final String topic, - final String queryableStoreName) { + public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); return internalStreamsBuilder.globalTable(topic, - new ConsumedInternal<>(Consumed.with(keySerde, valueSerde)), - queryableStoreName); + new ConsumedInternal<K, V>(), + new MaterializedInternal<>(materialized)); } + /** * Adds a state store to the underlying {@link Topology}. * - * @param supplier the supplier used to obtain this state store {@link StateStore} instance - * @param processorNames the names of the processors that should be able to access the provided store + * @param builder the builder used to obtain this state store {@link StateStore} instance * @return itself * @throws TopologyException if state store supplier is already added */ - public synchronized StreamsBuilder addStateStore(final StateStoreSupplier supplier, - final String... processorNames) { - internalStreamsBuilder.addStateStore(supplier, processorNames); + public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { + Objects.requireNonNull(builder, "builder can't be null"); + internalStreamsBuilder.addStateStore(builder); return this; } @@ -907,62 +461,30 @@ public class StreamsBuilder { * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * - * @param storeSupplier user defined state store supplier + * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with * @param topic the topic to source the data from + * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null} * @param processorName the name of the {@link ProcessorSupplier} * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} * @return itself * @throws TopologyException if the processor of state is already registered */ - public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, - final String sourceName, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, + @SuppressWarnings("unchecked") + public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder, final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { - internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, - valueDeserializer, topic, processorName, stateUpdateSupplier); - return this; - } - - /** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - * <p> - * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - * <p> - * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * - * @param storeSupplier user defined state store supplier - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered - */ - public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, final String sourceName, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, + final Consumed consumed, final String processorName, final ProcessorSupplier stateUpdateSupplier) { - internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, - valueDeserializer, topic, processorName, stateUpdateSupplier); + Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); + Objects.requireNonNull(consumed, "consumed can't be null"); + internalStreamsBuilder.addGlobalStore(storeBuilder, + sourceName, + topic, + new ConsumedInternal<>(consumed), + processorName, + stateUpdateSupplier); return this; } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/Topology.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 386aacf..85d769f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -571,7 +571,7 @@ public class Topology { * @return itself * @throws TopologyException if the processor of state is already registered */ - public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder, + public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 4da9906..3963657 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -17,17 +17,15 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; +import org.apache.kafka.streams.state.StoreBuilder; import java.util.Collection; import java.util.Collections; @@ -72,37 +70,55 @@ public class InternalStreamsBuilder { return new KStreamImpl<>(this, name, Collections.singleton(name), false); } - @SuppressWarnings("unchecked") public <K, V> KTable<K, V> table(final String topic, final ConsumedInternal<K, V> consumed, - final String queryableStoreName) { - final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); - final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName, - consumed.keySerde(), - consumed.valueSerde(), - false, - Collections.<String, String>emptyMap(), - true); - return doTable(consumed, topic, storeSupplier, queryableStoreName != null); - } - - public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset, - final TimestampExtractor timestampExtractor, - final Serde<K> keySerde, - final Serde<V> valSerde, - final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); - return doTable(new ConsumedInternal<>(keySerde, valSerde, timestampExtractor, offsetReset), topic, storeSupplier, true); + final String source = newName(KStreamImpl.SOURCE_NAME); + final String name = newName(KTableImpl.SOURCE_NAME); + + final KTable<K, V> kTable = createKTable(consumed, + topic, + storeSupplier.name(), + true, + source, + name); + + internalTopologyBuilder.addStateStore(storeSupplier, name); + internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic); + + return kTable; } - private <K, V> KTable<K, V> doTable(final ConsumedInternal<K, V> consumed, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier, - final boolean isQueryable) { + @SuppressWarnings("unchecked") + public <K, V> KTable<K, V> table(final String topic, + final ConsumedInternal<K, V> consumed, + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize(); + final String source = newName(KStreamImpl.SOURCE_NAME); final String name = newName(KTableImpl.SOURCE_NAME); - final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name()); + final KTable<K, V> kTable = createKTable(consumed, + topic, + storeBuilder.name(), + materialized.isQueryable(), + source, + name); + + internalTopologyBuilder.addStateStore(storeBuilder, name); + internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic); + + return kTable; + } + + + private <K, V> KTable<K, V> createKTable(final ConsumedInternal<K, V> consumed, + final String topic, + final String storeName, + final boolean isQueryable, + final String source, + final String name) { + final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName); internalTopologyBuilder.addSource(consumed.offsetResetPolicy(), source, @@ -112,48 +128,27 @@ public class InternalStreamsBuilder { topic); internalTopologyBuilder.addProcessor(name, processorSupplier, source); - final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, - consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeSupplier.name(), isQueryable); - - internalTopologyBuilder.addStateStore(storeSupplier, name); - internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic); - - return kTable; + return new KTableImpl<>(this, name, processorSupplier, + consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable); } public <K, V> GlobalKTable<K, V> globalTable(final String topic, final ConsumedInternal<K, V> consumed, - final String queryableStoreName) { - final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); - return doGlobalTable(consumed, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName, - consumed.keySerde(), - consumed.valueSerde(), - false, - Collections.<String, String>emptyMap(), - true)); - } - - public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, - final Serde<V> valSerde, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - return doGlobalTable(new ConsumedInternal<>(keySerde, valSerde, null, null), topic, storeSupplier); - } - - @SuppressWarnings("unchecked") - private <K, V> GlobalKTable<K, V> doGlobalTable(final ConsumedInternal<K, V> consumed, - final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { - Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(consumed, "consumed can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + // explicitly disable logging for global stores + materialized.withLoggingDisabled(); + final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize(); final String sourceName = newName(KStreamImpl.SOURCE_NAME); final String processorName = newName(KTableImpl.SOURCE_NAME); - final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name()); + final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name()); final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer(); final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(); - internalTopologyBuilder.addGlobalStore(storeSupplier, + internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, consumed.timestampExtractor(), keyDeserializer, @@ -161,9 +156,10 @@ public class InternalStreamsBuilder { topic, processorName, tableSource); - return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name())); + return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name())); } + public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) { return KStreamImpl.merge(this, streams); } @@ -172,35 +168,31 @@ public class InternalStreamsBuilder { return prefix + String.format("%010d", index.getAndIncrement()); } - String newStoreName(final String prefix) { + public String newStoreName(final String prefix) { return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement()); } - public synchronized void addStateStore(final StateStoreSupplier supplier, - final String... processorNames) { - internalTopologyBuilder.addStateStore(supplier, processorNames); - } - - public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, - final String sourceName, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { - internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, - valueDeserializer, topic, processorName, stateUpdateSupplier); + public synchronized void addStateStore(final StoreBuilder builder) { + internalTopologyBuilder.addStateStore(builder); } - public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, + public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, final String sourceName, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, final String topic, + final ConsumedInternal consumed, final String processorName, final ProcessorSupplier stateUpdateSupplier) { - internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, - valueDeserializer, topic, processorName, stateUpdateSupplier); + // explicitly disable logging for global stores + storeBuilder.withLoggingDisabled(); + final Deserializer keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer(); + final Deserializer valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(); + internalTopologyBuilder.addGlobalStore(storeBuilder, + sourceName, + consumed.timestampExtractor(), + keyDeserializer, + valueDeserializer, + topic, + processorName, + stateUpdateSupplier); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 067bcfc..a42db0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.ForeachAction; @@ -401,7 +402,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return builder.table(topic, new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null), - internalStoreName); + new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName) + .withKeySerde(keySerde) + .withValueSerde(valSerde), + queryableStoreName != null)); } @Override @@ -413,7 +417,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); to(keySerde, valSerde, partitioner, topic); - return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, storeSupplier); + final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null)); + return builder.table(topic, consumed, storeSupplier); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index 0ee610f..9f186fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -30,8 +30,9 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ public MaterializedInternal(final Materialized<K, V, S> materialized) { this(materialized, true); } - - MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) { + + public MaterializedInternal(final Materialized<K, V, S> materialized, + final boolean queryable) { super(materialized); this.queryable = queryable; } @@ -67,7 +68,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ return cachingEnabled; } - public boolean isQueryable() { + boolean isQueryable() { return queryable; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 193d0e1..d47af88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreSupplier; import org.slf4j.Logger; @@ -557,7 +556,7 @@ public class InternalTopologyBuilder { } - public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder, + public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 73c5484..f1ae6da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -505,7 +505,8 @@ public class KafkaStreamsTest { CLUSTER.createTopic(topic); final StreamsBuilder builder = new StreamsBuilder(); - builder.table(Serdes.String(), Serdes.String(), topic, topic); + final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); + builder.table(topic, consumed); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index e50f4d0..6c7b2b4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -32,11 +32,11 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; @@ -583,13 +583,11 @@ public class EosIntegrationTest { String[] storeNames = null; if (withState) { storeNames = new String[] {storeName}; - final StateStoreSupplier storeSupplier = Stores.create(storeName) - .withLongKeys() - .withLongValues() - .persistent() - .build(); + final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder + = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long()) + .withCachingEnabled(); - builder.addStateStore(storeSupplier); + builder.addStateStore(storeBuilder); } final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
