Repository: kafka Updated Branches: refs/heads/trunk cdbf806e2 -> cc84686a4
MINOR: additional kip-182 doc updates Author: Damian Guy <damian....@gmail.com> Reviewers: Michael G. Noll <mich...@confluent.io>, Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #3971 from dguy/kip-182-docs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc84686a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc84686a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc84686a Branch: refs/heads/trunk Commit: cc84686a4aa24e541f7ca5ee9dcb0dea0ddbd79a Parents: cdbf806 Author: Damian Guy <damian....@gmail.com> Authored: Mon Oct 2 13:20:49 2017 -0700 Committer: Damian Guy <damian....@gmail.com> Committed: Mon Oct 2 13:20:49 2017 -0700 ---------------------------------------------------------------------- docs/streams/developer-guide.html | 244 +++++++++++++++++---------------- 1 file changed, 128 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cc84686a/docs/streams/developer-guide.html ---------------------------------------------------------------------- diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index a064a5d..842325b 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -1383,65 +1383,72 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r // Java 8+ examples, using lambda expressions // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.aggregate( - () -> 0L, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - Serdes.Long(), /* serde for aggregate value */ - "time-windowed-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .aggregate( + () -> 0L, /* initializer */ + (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ + Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long())); /* serde for aggregate value */ + // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.aggregate( - () -> 0L, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */ - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - Serdes.Long(), /* serde for aggregate value */ - "sessionized-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .aggregate( + () -> 0L, /* initializer */ + (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ + (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */ + Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long())); /* serde for aggregate value */ // Java 7 examples // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.aggregate( - new Initializer<Long>() { /* initializer */ - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<String, Long, Long>() { /* adder */ - @Override - public Long apply(String aggKey, Long newValue, Long aggValue) { - return aggValue + newValue; - } - }, - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - Serdes.Long(), /* serde for aggregate value */ - "time-windowed-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .aggregate( + new Initializer<Long>() { /* initializer */ + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<String, Long, Long>() { /* adder */ + @Override + public Long apply(String aggKey, Long newValue, Long aggValue) { + return aggValue + newValue; + } + }, + Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long()) /* serde for aggregate value */ + ); // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.aggregate( - new Initializer<Long>() { /* initializer */ - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator<String, Long, Long>() { /* adder */ - @Override - public Long apply(String aggKey, Long newValue, Long aggValue) { - return aggValue + newValue; - } - }, - new Merger<String, Long>() { /* session merger */ - @Override - public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) { - return rightAggValue + leftAggValue; - } - }, - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - Serdes.Long(), /* serde for aggregate value */ - "sessionized-aggregated-stream-store" /* state store name */); + KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .aggregate( + new Initializer<Long>() { /* initializer */ + @Override + public Long apply() { + return 0L; + } + }, + new Aggregator<String, Long, Long>() { /* adder */ + @Override + public Long apply(String aggKey, Long newValue, Long aggValue) { + return aggValue + newValue; + } + }, + new Merger<String, Long>() { /* session merger */ + @Override + public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) { + return rightAggValue + leftAggValue; + } + }, + Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */ + .withValueSerde(Serdes.Long()) /* serde for aggregate value */ + ); </pre> <p> @@ -1478,12 +1485,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r KGroupedTable<String, Long> groupedTable = ...; // Counting a KGroupedStream - KTable<String, Long> aggregatedStream = groupedStream.count( - "counted-stream-store" /* state store name */); + KTable<String, Long> aggregatedStream = groupedStream.count(); // Counting a KGroupedTable - KTable<String, Long> aggregatedTable = groupedTable.count( - "counted-table-store" /* state store name */); + KTable<String, Long> aggregatedTable = groupedTable.count(); </pre> <p> Detailed behavior for <code>KGroupedStream</code>: @@ -1518,14 +1523,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r KGroupedStream<String, Long> groupedStream = ...; // Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> aggregatedStream = groupedStream.count( - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - "time-windowed-counted-stream-store" /* state store name */); + KTable<Windowed<String>, Long> aggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .count(); // Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps) - KTable<Windowed<String>, Long> aggregatedStream = groupedStream.count( - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - "sessionized-counted-stream-store" /* state store name */); + KTable<Windowed<String>, Long> aggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .count(); </pre> <p> Detailed behavior: @@ -1561,14 +1566,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r // Reducing a KGroupedStream KTable<String, Long> aggregatedStream = groupedStream.reduce( - (aggValue, newValue) -> aggValue + newValue, /* adder */ - "reduced-stream-store" /* state store name */); + (aggValue, newValue) -> aggValue + newValue /* adder */ + ); // Reducing a KGroupedTable KTable<String, Long> aggregatedTable = groupedTable.reduce( (aggValue, newValue) -> aggValue + newValue, /* adder */ - (aggValue, oldValue) -> aggValue - oldValue, /* subtractor */ - "reduced-table-store" /* state store name */); + (aggValue, oldValue) -> aggValue - oldValue /* subtractor */ + ); // Java 7 examples @@ -1580,8 +1585,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r public Long apply(Long aggValue, Long newValue) { return aggValue + newValue; } - }, - "reduced-stream-store" /* state store name */); + } + ); // Reducing a KGroupedTable KTable<String, Long> aggregatedTable = groupedTable.reduce( @@ -1596,8 +1601,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r public Long apply(Long aggValue, Long oldValue) { return aggValue - oldValue; } - }, - "reduced-table-store" /* state store name */); + } + ); </pre> <p> Detailed behavior for <code>KGroupedStream</code>: @@ -1659,41 +1664,39 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r // Java 8+ examples, using lambda expressions // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce( - (aggValue, newValue) -> aggValue + newValue, /* adder */ - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - "time-windowed-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .reduce((aggValue, newValue) -> aggValue + newValue /* adder */); // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.reduce( - (aggValue, newValue) -> aggValue + newValue, /* adder */ - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - "sessionized-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .reduce((aggValue, newValue) -> aggValue + newValue); /* adder */ // Java 7 examples // Aggregating with time-based windowing (here: with 5-minute tumbling windows) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce( - new Reducer<Long>() { /* adder */ - @Override - public Long apply(Long aggValue, Long newValue) { - return aggValue + newValue; - } - }, - TimeWindows.of(TimeUnit.MINUTES.toMillis(5)), /* time-based window */ - "time-windowed-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */ + .reduce( + new Reducer<Long>() { /* adder */ + @Override + public Long apply(Long aggValue, Long newValue) { + return aggValue + newValue; + } + }); // Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes) - KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.reduce( - new Reducer<Long>() { /* adder */ - @Override - public Long apply(Long aggValue, Long newValue) { - return aggValue + newValue; - } - }, - SessionWindows.with(TimeUnit.MINUTES.toMillis(5)), /* session window */ - "sessionized-reduced-stream-store" /* state store name */); + KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream + .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */ + .reduce( + new Reducer<Long>() { /* adder */ + @Override + public Long apply(Long aggValue, Long newValue) { + return aggValue + newValue; + } + }); </pre> <p> @@ -1723,16 +1726,22 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r </p> <pre class="brush: java;"> // Key: word, value: count + Properties streamsProperties == ...; + + // specify the default serdes so we don't need to elsewhere. + streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + StreamsConfig config = new StreamsConfig(streamsProperties); + KStream<String, Integer> wordCounts = ...; KGroupedStream<String, Integer> groupedStream = wordCounts - .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer())); + .groupByKey(); KTable<String, Integer> aggregated = groupedStream.aggregate( () -> 0, /* initializer */ - (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ - Serdes.Integer(), /* serde for aggregate value */ - "aggregated-stream-store" /* state store name */); + (aggKey, newValue, aggValue) -> aggValue + newValue /* adder */ + ); </pre> <p> @@ -1836,8 +1845,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r () -> 0, /* initializer */ (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */ (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */ - Serdes.Integer(), /* serde for aggregate value */ - "aggregated-table-store" /* state store name */); + Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-table-store") + .withKeySerde(Serdes.String() /* serde for aggregate key */) + .withValueSerde(Serdes.Long() /* serde for aggregate value */) + ); </pre> <p> <b>Impact of <a href=#streams_developer-guide_memory-management_record-cache>record caches</a></b>: @@ -2253,7 +2264,10 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde)); // Create a window state store named "CountsWindowStore" that contains the word counts for every minute - groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore"); + groupedByWord.windowedBy(TimeWindows.of(60000)) + .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("CountsWindowStore") + withKeySerde(Serdes.String()); // count() sets value serde to Serdes.Long() automatically + ); </pre> <p> @@ -2396,14 +2410,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r Topology topology = ...; ProcessorSupplier processorSuppler = ...; - // Create CustomStoreSupplier for store name the-custom-store - MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store"); + // Create CustomStoreBuilder for store name the-custom-store + MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store"); // Add the source topic topology.addSource("input", "inputTopic"); // Add a custom processor that reads from the source topic topology.addProcessor("the-processor", processorSupplier, "input"); // Connect your custom state store to the custom processor above - topology.addStateStore(customStoreSupplier, "the-processor"); + topology.addStateStore(customStoreBuilder, "the-processor"); KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); @@ -2478,7 +2492,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r // This call to `count()` creates a state store named "word-count". // The state store is discoverable and can be queried interactively. - groupedByWord.count("word-count"); + groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count")); // Start an instance of the topology KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); @@ -2835,22 +2849,20 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r <p> For changelog topics you can also override the default configs on a per store basis. - This can be done by using any method overload that has a <code>StateStoreSupplier</code> as a parameter: + This can be done by using any method overload that has a <code>Materialized</code> as a parameter: </p> <pre class="brush: java;"> // a map to add topic config - Map<String, String> topicConfig = new HashMap<>(); + Map<String, String> topicConfig = new HashMap<>(); topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG, "10000"); - StateStoreSupplier supplier = Stores.create("store") - .withKeys(Serdes.String()) - .withValues(Serdes.String()) - .persistent() - .enableLogging(topicConfig) // pass in the config overrides - .build(); - - groupedStream.count(supplier) + final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .withLoggingEnabled(topicConfig); // pass in the config overrides + + groupedStream.count(materialized) </pre> <h4><a id="streams_execute" href="#streams_execute">Executing Your Kafka Streams Application</a></h4>