MINOR: add suppress warnings annotations in Streams API - fixes examples with regard to new API - fixes `Topology#addGlobalStore` parameters
Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>, Damian Guy <damian....@gmail.com> Closes #4003 from mjsax/minor-deprecated Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/713a67fd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/713a67fd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/713a67fd Branch: refs/heads/trunk Commit: 713a67fddaec3fa9cd7cce53dd6fef5ab6e0cdab Parents: 51c652c Author: Matthias J. Sax <matth...@confluent.io> Authored: Wed Oct 4 14:42:07 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Oct 4 14:42:07 2017 -0700 ---------------------------------------------------------------------- .../examples/pageview/PageViewTypedDemo.java | 77 +++++++++---------- .../examples/pageview/PageViewUntypedDemo.java | 66 ++++++++-------- .../examples/temperature/TemperatureDemo.java | 54 ++++++------- .../examples/wordcount/WordCountDemo.java | 29 +++---- .../java/org/apache/kafka/streams/Topology.java | 6 +- .../kafka/streams/kstream/KGroupedStream.java | 80 +++++++++----------- .../kafka/streams/kstream/KGroupedTable.java | 7 +- .../kafka/streams/kstream/KStreamBuilder.java | 30 ++++---- .../apache/kafka/streams/kstream/KTable.java | 23 +++--- .../kstream/internals/AbstractStream.java | 10 +-- .../internals/InternalStreamsBuilder.java | 5 +- .../kstream/internals/KGroupedStreamImpl.java | 59 ++++++++++----- .../kstream/internals/KGroupedTableImpl.java | 36 +++++---- .../streams/kstream/internals/KStreamImpl.java | 36 ++++++++- .../kstream/internals/KStreamKStreamJoin.java | 1 + .../streams/kstream/internals/KTableImpl.java | 77 ++++++++++++++----- .../kstream/internals/WindowedSerializer.java | 5 +- .../streams/processor/TopologyBuilder.java | 1 + .../processor/internals/AbstractTask.java | 2 +- .../internals/InternalTopologyBuilder.java | 11 +-- .../streams/processor/internals/QuickUnion.java | 1 + .../processor/internals/StreamsMetricsImpl.java | 2 +- .../org/apache/kafka/streams/state/Stores.java | 12 +-- .../state/internals/AbstractStoreSupplier.java | 5 +- .../InMemoryKeyValueStoreSupplier.java | 1 + .../InMemoryLRUCacheStoreSupplier.java | 1 + .../internals/MeteredKeyValueBytesStore.java | 1 + .../state/internals/MeteredSessionStore.java | 1 + .../state/internals/MeteredWindowStore.java | 2 +- .../internals/RocksDBKeyValueStoreSupplier.java | 2 +- .../internals/RocksDBSessionStoreSupplier.java | 1 + .../internals/RocksDBWindowStoreSupplier.java | 2 +- .../RocksDbSessionBytesStoreSupplier.java | 17 +++-- .../RocksDbWindowBytesStoreSupplier.java | 7 +- .../state/internals/WindowStoreSupplier.java | 6 +- .../kafka/test/MockStateStoreSupplier.java | 2 +- 36 files changed, 394 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 068eece..101cd23 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Windowed; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, @@ -150,45 +151,45 @@ public class PageViewTypedDemo { Consumed.with(Serdes.String(), userProfileSerde)); KStream<WindowedPageViewByRegion, RegionCount> regionCount = views - .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { - @Override - public PageViewByRegion apply(PageView view, UserProfile profile) { - PageViewByRegion viewByRegion = new PageViewByRegion(); - viewByRegion.user = view.user; - viewByRegion.page = view.page; - - if (profile != null) { - viewByRegion.region = profile.region; - } else { - viewByRegion.region = "UNKNOWN"; - } - return viewByRegion; + .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { + @Override + public PageViewByRegion apply(PageView view, UserProfile profile) { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + + if (profile != null) { + viewByRegion.region = profile.region; + } else { + viewByRegion.region = "UNKNOWN"; } - }) - .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() { - @Override - public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) { - return new KeyValue<>(viewRegion.region, viewRegion); - } - }) - .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde)) - .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion") - // TODO: we can merge ths toStream().map(...) with a single toStream(...) - .toStream() - .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { - @Override - public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { - WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); - wViewByRegion.windowStart = key.window().start(); - wViewByRegion.region = key.key(); - - RegionCount rCount = new RegionCount(); - rCount.region = key.key(); - rCount.count = value; - - return new KeyValue<>(wViewByRegion, rCount); - } - }); + return viewByRegion; + } + }) + .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() { + @Override + public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) { + return new KeyValue<>(viewRegion.region, viewRegion); + } + }) + .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde)) + .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1))) + .count() + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { + @Override + public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { + WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); + wViewByRegion.windowStart = key.window().start(); + wViewByRegion.region = key.key(); + + RegionCount rCount = new RegionCount(); + rCount.region = key.key(); + rCount.count = value; + + return new KeyValue<>(wViewByRegion, rCount); + } + }); // write to the result topic regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde)); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index c20c077..ae72042 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -87,39 +87,39 @@ public class PageViewUntypedDemo { }); KStream<JsonNode, JsonNode> regionCount = views - .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() { - @Override - public JsonNode apply(JsonNode view, String region) { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); - - return jNode.put("user", view.get("user").textValue()) - .put("page", view.get("page").textValue()) - .put("region", region == null ? "UNKNOWN" : region); - } - }) - .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() { - @Override - public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) { - return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); - } - }) - .groupByKey(Serialized.with(Serdes.String(), jsonSerde)) - .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion") - // TODO: we can merge ths toStream().map(...) with a single toStream(...) - .toStream() - .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { - @Override - public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { - ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); - keyNode.put("window-start", key.window().start()) - .put("region", key.key()); - - ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); - valueNode.put("count", value); - - return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); - } - }); + .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() { + @Override + public JsonNode apply(JsonNode view, String region) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + return jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region == null ? "UNKNOWN" : region); + } + }) + .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() { + @Override + public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) { + return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); + } + }) + .groupByKey(Serialized.with(Serdes.String(), jsonSerde)) + .windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000)) + .count() + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { + @Override + public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { + ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("window-start", key.window().start()) + .put("region", key.key()); + + ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); + valueNode.put("count", value); + + return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); + } + }); // write to the result topic regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde)); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java index 2039ca5..ea81dd6 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; @@ -87,38 +88,39 @@ public class TemperatureDemo { KStream<String, String> source = builder.stream("iot-temperature"); KStream<Windowed<String>, String> max = source - // temperature values are sent without a key (null), so in order - // to group and reduce them, a key is needed ("temp" has been chosen) - .selectKey(new KeyValueMapper<String, String, String>() { - @Override - public String apply(String key, String value) { - return "temp"; - } - }) - .groupByKey() - .reduce(new Reducer<String>() { - @Override - public String apply(String value1, String value2) { - if (Integer.parseInt(value1) > Integer.parseInt(value2)) - return value1; - else - return value2; - } - }, TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE))) - .toStream() - .filter(new Predicate<Windowed<String>, String>() { - @Override - public boolean test(Windowed<String> key, String value) { - return Integer.parseInt(value) > TEMPERATURE_THRESHOLD; - } - }); + // temperature values are sent without a key (null), so in order + // to group and reduce them, a key is needed ("temp" has been chosen) + .selectKey(new KeyValueMapper<String, String, String>() { + @Override + public String apply(String key, String value) { + return "temp"; + } + }) + .groupByKey() + .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE))) + .reduce(new Reducer<String>() { + @Override + public String apply(String value1, String value2) { + if (Integer.parseInt(value1) > Integer.parseInt(value2)) + return value1; + else + return value2; + } + }) + .toStream() + .filter(new Predicate<Windowed<String>, String>() { + @Override + public boolean test(Windowed<String> key, String value) { + return Integer.parseInt(value) > TEMPERATURE_THRESHOLD; + } + }); WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer()); WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE); Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer); // need to override key serde to Windowed<String> type - max.to(windowedSerde, Serdes.String(), "iot-temperature-max"); + max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String())); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 5689d50..7535315 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; import java.util.Arrays; @@ -63,22 +64,22 @@ public class WordCountDemo { KStream<String, String> source = builder.stream("streams-plaintext-input"); KTable<String, Long> counts = source - .flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); - } - }) - .groupBy(new KeyValueMapper<String, String, String>() { - @Override - public String apply(String key, String value) { - return value; - } - }) - .count("Counts"); + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); + } + }) + .groupBy(new KeyValueMapper<String, String, String>() { + @Override + public String apply(String key, String value) { + return value; + } + }) + .count(); // need to override value serde to Long type - counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/Topology.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 1409b97..3b1ac6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import java.util.regex.Pattern; @@ -573,6 +572,7 @@ public class Topology { * @return itself * @throws TopologyException if the processor of state is already registered */ + @SuppressWarnings("unchecked") public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, final String sourceName, final Deserializer keyDeserializer, @@ -609,7 +609,8 @@ public class Topology { * @return itself * @throws TopologyException if the processor of state is already registered */ - public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder, + @SuppressWarnings("unchecked") + public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, @@ -641,6 +642,7 @@ public class Topology { * * @return a description of the topology. */ + public synchronized TopologyDescription describe() { return internalTopologyBuilder.describe(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 17f2db4..b3945f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.SessionStore; @@ -132,7 +131,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // counting words * String queryableStoreName = storeSupplier.name(); @@ -149,7 +148,7 @@ public interface KGroupedStream<K, V> { * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier))} */ @Deprecated - KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier); + KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Count the number of records in this stream by the grouped key. @@ -290,7 +289,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // counting words * String queryableStoreName = storeSupplier.name(); @@ -312,7 +311,7 @@ public interface KGroupedStream<K, V> { */ @Deprecated <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, - final StateStoreSupplier<WindowStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier); /** @@ -333,10 +332,8 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum - * Sting queryableStoreName = storeSupplier.name(); * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>); * String key = "some-key"; * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) @@ -398,7 +395,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum * Sting queryableStoreName = storeSupplier.name(); @@ -418,7 +415,7 @@ public interface KGroupedStream<K, V> { */ @Deprecated KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, - final StateStoreSupplier<SessionStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier); /** * Combine the values of records in this stream by the grouped key. @@ -522,7 +519,7 @@ public interface KGroupedStream<K, V> { * Combine the value of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value - * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}). + * (c.f. {@link #aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)}). * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * provided by the given {@code storeSupplier}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -540,8 +537,8 @@ public interface KGroupedStream<K, V> { * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. - * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or - * max. + * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute + * aggregate functions like sum, min, or max. * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same key. @@ -552,7 +549,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum * String queryableStoreName = storeSupplier.name(); @@ -571,7 +568,7 @@ public interface KGroupedStream<K, V> { */ @Deprecated KTable<K, V> reduce(final Reducer<V> reducer, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Combine the value of records in this stream by the grouped key. @@ -595,7 +592,7 @@ public interface KGroupedStream<K, V> { * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. - * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or + * Thus, {@code reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or * max. * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to @@ -767,8 +764,8 @@ public interface KGroupedStream<K, V> { * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. - * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum, - * min, or max. + * Thus, {@code reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used to + * compute aggregate functions like sum, min, or max. * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same window and key. @@ -779,7 +776,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum * Sting queryableStoreName = storeSupplier.name(); @@ -803,7 +800,7 @@ public interface KGroupedStream<K, V> { @Deprecated <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Windows<W> windows, - final StateStoreSupplier<WindowStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier); /** * Combine values of this stream by the grouped key into {@link SessionWindows}. @@ -841,10 +838,8 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum - * Sting queryableStoreName = storeSupplier.name(); * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>); * String key = "some-key"; * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) @@ -933,8 +928,8 @@ public interface KGroupedStream<K, V> { * <p> * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. - * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like - * sum, min, or max. + * Thus, {@code reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)} can be used + * to compute aggregate functions like sum, min, or max. * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same window and key. @@ -945,7 +940,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum * Sting queryableStoreName = storeSupplier.name(); @@ -975,7 +970,7 @@ public interface KGroupedStream<K, V> { @Deprecated KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, - final StateStoreSupplier<SessionStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier); /** @@ -1188,8 +1183,8 @@ public interface KGroupedStream<K, V> { /** * Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. - * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier) combining via reduce(...)} as it, - * for example, allows the result to have a different type than the input values. + * Aggregating is a generalization of {@link #reduce(Reducer, org.apache.kafka.streams.processor.StateStoreSupplier) + * combining via reduce(...)} as it, for example, allows the result to have a different type than the input values. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * provided by the given {@code storeSupplier}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. @@ -1199,8 +1194,8 @@ public interface KGroupedStream<K, V> { * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. - * Thus, {@code aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions - * like count (c.f. {@link #count()}). + * Thus, {@code aggregate(Initializer, Aggregator, org.apache.kafka.streams.processor.StateStoreSupplier)} can be + * used to compute aggregate functions like count (c.f. {@link #count()}). * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same key. @@ -1211,7 +1206,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // some aggregation on value type double * Sting queryableStoreName = storeSupplier.name(); @@ -1233,7 +1228,7 @@ public interface KGroupedStream<K, V> { @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Aggregate the values of records in this stream by the grouped key and defined windows. @@ -1362,8 +1357,9 @@ public interface KGroupedStream<K, V> { /** * Aggregate the values of records in this stream by the grouped key and defined windows. * Records with {@code null} key or value are ignored. - * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier) combining via - * reduce(...)} as it, for example, allows the result to have a different type than the input values. + * Aggregating is a generalization of + * {@link #reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier) combining via reduce(...)} + * as it, for example, allows the result to have a different type than the input values. * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f. * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}). * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating @@ -1377,8 +1373,8 @@ public interface KGroupedStream<K, V> { * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. - * Thus, {@code aggregate(Initializer, Aggregator, Windows, StateStoreSupplier)} can be used to compute aggregate - * functions like count (c.f. {@link #count(Windows)}). + * Thus, {@code aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)} + * can be used to compute aggregate functions like count (c.f. {@link #count(Windows)}). * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same window and key. @@ -1389,7 +1385,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local windowed {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // some windowed aggregation on value type Long * Sting queryableStoreName = storeSupplier.name(); @@ -1416,7 +1412,7 @@ public interface KGroupedStream<K, V> { <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, - final StateStoreSupplier<WindowStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier); /** * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}. @@ -1446,10 +1442,8 @@ public interface KGroupedStream<K, V> { * <p> * To query the local {@link SessionStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // some windowed aggregation on value type double - * Sting queryableStoreName = storeSupplier.name(); * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore()); * String key = "some-key"; * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) @@ -1540,8 +1534,8 @@ public interface KGroupedStream<K, V> { * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current * aggregate (or for the very first record using the intermediate aggregation result provided via the * {@link Initializer}) and the record's value. - * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, StateStoreSupplier)} can be used - * to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}). + * Thus, {@code #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, org.apache.kafka.streams.processor.StateStoreSupplier)} + * can be used to compute aggregate functions like count (c.f. {@link #count(SessionWindows)}). * <p> * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to * the same window and key. @@ -1552,7 +1546,7 @@ public interface KGroupedStream<K, V> { * <p> * To query the local {@link SessionStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // some windowed aggregation on value type double * Sting queryableStoreName = storeSupplier.name(); @@ -1583,7 +1577,7 @@ public interface KGroupedStream<K, V> { final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, - final StateStoreSupplier<SessionStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier); /** * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations. http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index f814eaf..d0a38cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; @@ -195,7 +194,7 @@ public interface KGroupedTable<K, V> { * @deprecated use {@link #count(Materialized) count(Materialized.as(KeyValueByteStoreSupplier)} */ @Deprecated - KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier); + KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) @@ -467,7 +466,7 @@ public interface KGroupedTable<K, V> { @Deprecated KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) @@ -945,6 +944,6 @@ public interface KGroupedTable<K, V> { <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); } http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index d4642da..6b51c86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; @@ -431,7 +430,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * @return a {@link KTable} for the specified topic */ public <K, V> KTable<K, V> table(final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { return table(null, null, null, null, topic, storeSupplier); } @@ -525,7 +524,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB */ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { return table(offsetReset, null, null, null, topic, storeSupplier); } @@ -702,7 +701,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB public <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { return table(null, null, keySerde, valSerde, topic, storeSupplier); } @@ -737,7 +736,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB final Serde<V> valSerde, final TimestampExtractor timestampExtractor, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier, + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier, final boolean isQueryable) { try { final String source = newName(KStreamImpl.SOURCE_NAME); @@ -882,6 +881,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)} * @return a {@link KTable} for the specified topic */ + @SuppressWarnings("unchecked") public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, @@ -889,12 +889,13 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB final String topic, final String queryableStoreName) { final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME); - final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName, - keySerde, - valSerde, - false, - Collections.<String, String>emptyMap(), - true); + final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>( + internalStoreName, + keySerde, + valSerde, + false, + Collections.<String, String>emptyMap(), + true); return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null); } @@ -965,7 +966,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true); } @@ -1096,7 +1097,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier); } @@ -1172,7 +1173,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB final Serde<V> valSerde, final TimestampExtractor timestampExtractor, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { try { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); final String sourceName = newName(KStreamImpl.SOURCE_NAME); @@ -1224,6 +1225,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * @param streams the {@link KStream}s to be merged * @return a {@link KStream} containing all records of the given streams */ + @SuppressWarnings("unchecked") public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) { Objects.requireNonNull(streams, "streams can't be null"); if (streams.length <= 1) { http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 1abc5e7..33e56aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -206,7 +205,8 @@ public interface KTable<K, V> { * @deprecated use {@link #filter(Predicate, Materialized) filter(predicate, Materialized.as(KeyValueByteStoreSupplier))} */ @Deprecated - KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); + KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the @@ -300,7 +300,8 @@ public interface KTable<K, V> { * @deprecated use {@link #filterNot(Predicate, Materialized) filterNot(predicate, Materialized.as(KeyValueByteStoreSupplier))} */ @Deprecated - KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); + KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the @@ -512,7 +513,7 @@ public interface KTable<K, V> { @Deprecated <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** @@ -811,7 +812,7 @@ public interface KTable<K, V> { */ @Deprecated KTable<K, V> through(final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using default @@ -913,7 +914,7 @@ public interface KTable<K, V> { @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic. @@ -978,7 +979,7 @@ public interface KTable<K, V> { KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic. @@ -1080,7 +1081,7 @@ public interface KTable<K, V> { final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Materialize this changelog stream to a topic and creates a new {@code KTable} from the topic using a customizable @@ -1590,7 +1591,7 @@ public interface KTable<K, V> { @Deprecated <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** @@ -1934,7 +1935,7 @@ public interface KTable<K, V> { @Deprecated <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** @@ -2275,7 +2276,7 @@ public interface KTable<K, V> { @Deprecated <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, - final StateStoreSupplier<KeyValueStore> storeSupplier); + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier); /** * Get the name of the local state store used that can be used to query this {@code KTable}. http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index b5de562..26e404e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windows; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; @@ -80,8 +79,8 @@ public abstract class AbstractStream<K> { }; } - @SuppressWarnings("unchecked") - static <T, K> StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde, + @SuppressWarnings({"unchecked", "deprecation"}) + static <T, K> org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde, final Serde<T> aggValueSerde, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); @@ -89,8 +88,8 @@ public abstract class AbstractStream<K> { return storeFactory(keySerde, aggValueSerde, storeName).build(); } - @SuppressWarnings("unchecked") - static <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde, + @SuppressWarnings({"unchecked", "deprecation"}) + static <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde, final Serde<T> aggValSerde, final Windows<W> windows, final String storeName) { @@ -101,6 +100,7 @@ public abstract class AbstractStream<K> { .build(); } + @SuppressWarnings("deprecation") static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde, final Serde<T> aggValueSerde, final String storeName) { http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 357a70c..0df1524 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -70,9 +69,10 @@ public class InternalStreamsBuilder { return new KStreamImpl<>(this, name, Collections.singleton(name), false); } + @SuppressWarnings("deprecation") public <K, V> KTable<K, V> table(final String topic, final ConsumedInternal<K, V> consumed, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); final String source = newName(KStreamImpl.SOURCE_NAME); final String name = newName(KTableImpl.SOURCE_NAME); @@ -132,6 +132,7 @@ public class InternalStreamsBuilder { consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable); } + @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final String topic, final ConsumedInternal<K, V> consumed, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 64dfd19..dafaa62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -28,11 +28,10 @@ import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Windows; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -78,6 +77,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } // no need for else {} since isQueryable is true by default } + @SuppressWarnings("deprecation") @Override public KTable<K, V> reduce(final Reducer<V> reducer, final String queryableStoreName) { @@ -91,9 +91,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return reduce(reducer, (String) null); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> reduce(final Reducer<V> reducer, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doAggregate( @@ -115,7 +116,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre materializedInternal); } - + @SuppressWarnings("deprecation") @Override public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Windows<W> windows, @@ -124,17 +125,18 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME))); } + @SuppressWarnings("deprecation") @Override public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Windows<W> windows) { return windowedBy(windows).reduce(reducer); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Windows<W> windows, - final StateStoreSupplier<WindowStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(windows, "windows can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); @@ -145,6 +147,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre ); } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -176,7 +179,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } @Override - public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) { + public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> aggregator) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); final String storeName = builder.newStoreName(AGGREGATE_NAME); @@ -189,6 +193,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -196,10 +201,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return aggregate(initializer, aggregator, aggValueSerde, null); } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); @@ -209,6 +215,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre storeSupplier); } + @SuppressWarnings("deprecation") @Override public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -219,6 +226,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME))); } + @SuppressWarnings("deprecation") @Override public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -230,12 +238,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre .withValueSerde(aggValueSerde)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Windows<W> windows, - final StateStoreSupplier<WindowStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(windows, "windows can't be null"); @@ -247,6 +255,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre ); } + @SuppressWarnings("deprecation") @Override public KTable<K, Long> count(final String queryableStoreName) { determineIsQueryable(queryableStoreName); @@ -258,8 +267,9 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return count((String) null); } + @SuppressWarnings("deprecation") @Override - public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) { + public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, storeSupplier); } @@ -274,6 +284,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, materialized); } + @SuppressWarnings("deprecation") @Override public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String queryableStoreName) { @@ -281,14 +292,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return count(windows, windowedStore(keySerde, Serdes.Long(), windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME))); } + @SuppressWarnings("deprecation") @Override public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) { return windowedBy(windows).count(); } + @SuppressWarnings("deprecation") @Override public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, - final StateStoreSupplier<WindowStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier) { return aggregate( aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, @@ -296,7 +309,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre storeSupplier); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -316,6 +329,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } + @SuppressWarnings("deprecation") @Override public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -330,14 +344,14 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre .withValueSerde(aggValueSerde)); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, - final StateStoreSupplier<SessionStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(sessionWindows, "sessionWindows can't be null"); @@ -373,7 +387,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre aggregateBuilder); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) { Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME)) .withKeySerde(keySerde) @@ -381,13 +395,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return windowedBy(sessionWindows).count(materialized); } + @SuppressWarnings("deprecation") public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) { return windowedBy(sessionWindows).count(); } + @SuppressWarnings("deprecation") @Override public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, - final StateStoreSupplier<SessionStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) { Objects.requireNonNull(sessionWindows, "sessionWindows can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); final Merger<K, Long> sessionMerger = new Merger<K, Long>() { @@ -406,7 +422,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Override public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, @@ -418,6 +434,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre .sessionWindowed(sessionWindows.maintainMs()).build()); } + @SuppressWarnings("deprecation") @Override public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows) { @@ -425,10 +442,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return windowedBy(sessionWindows).reduce(reducer); } + @SuppressWarnings("deprecation") @Override public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, - final StateStoreSupplier<SessionStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(sessionWindows, "sessionWindows can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); @@ -471,10 +489,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } + @SuppressWarnings("deprecation") private <T> KTable<K, T> doAggregate( final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier, final String functionName, - final StateStoreSupplier storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) { final String aggFunctionName = builder.newName(functionName); http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index e69d4f9..507944a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -21,15 +21,14 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedTable; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Collections; @@ -71,11 +70,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup } }; - public KGroupedTableImpl(final InternalStreamsBuilder builder, - final String name, - final String sourceName, - final Serde<? extends K> keySerde, - final Serde<? extends V> valSerde) { + KGroupedTableImpl(final InternalStreamsBuilder builder, + final String name, + final String sourceName, + final Serde<? extends K> keySerde, + final Serde<? extends V> valSerde) { super(builder, name, Collections.singleton(sourceName)); this.keySerde = keySerde; this.valSerde = valSerde; @@ -88,6 +87,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup } // no need for else {} since isQueryable is true by default } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> adder, @@ -98,6 +98,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME))); } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> adder, @@ -106,6 +107,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return aggregate(initializer, adder, subtractor, aggValueSerde, null); } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> adder, @@ -122,11 +124,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return aggregate(initializer, adder, subtractor, (String) null); } + @SuppressWarnings("deprecation") @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> adder, final Aggregator<? super K, ? super V, T> subtractor, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(adder, "adder can't be null"); Objects.requireNonNull(subtractor, "subtractor can't be null"); @@ -135,9 +138,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return doAggregate(aggregateSupplier, AGGREGATE_NAME, storeSupplier); } + @SuppressWarnings("deprecation") private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { final String sinkName = builder.newName(KStreamImpl.SINK_NAME); final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME); final String funcName = builder.newName(functionName); @@ -194,6 +198,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, @@ -223,10 +228,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return reduce(adder, subtractor, (String) null); } + @SuppressWarnings("deprecation") @Override public KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, - final StateStoreSupplier<KeyValueStore> storeSupplier) { + final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(adder, "adder can't be null"); Objects.requireNonNull(subtractor, "subtractor can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); @@ -234,6 +240,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return doAggregate(aggregateSupplier, REDUCE_NAME, storeSupplier); } + @SuppressWarnings("deprecation") @Override public KTable<K, Long> count(final String queryableStoreName) { determineIsQueryable(queryableStoreName); @@ -271,8 +278,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return count((String) null); } + @SuppressWarnings("deprecation") @Override - public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) { + public KTable<K, Long> count(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { return this.aggregate( countInitializer, countAdder,