Repository: kafka Updated Branches: refs/heads/trunk ac7695c32 -> 93b71e7de
MINOR: update streams quickstart for KIP-182 Author: Damian Guy <damian....@gmail.com> Reviewers: Michael G. Noll <mich...@confluent.io>, Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3984 from dguy/quickstart-update Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93b71e7d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93b71e7d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93b71e7d Branch: refs/heads/trunk Commit: 93b71e7deeb9d5c705d654443147f90e45cb11c8 Parents: ac7695c Author: Damian Guy <damian....@gmail.com> Authored: Wed Oct 4 12:19:40 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Oct 4 12:19:40 2017 -0700 ---------------------------------------------------------------------- docs/streams/quickstart.html | 7 ++++--- docs/streams/tutorial.html | 20 ++++++++++++-------- .../src/main/java/WordCount.java | 18 ++++++++++++------ 3 files changed, 28 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/93b71e7d/docs/streams/quickstart.html ---------------------------------------------------------------------- diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html index ea59194..72fd952 100644 --- a/docs/streams/quickstart.html +++ b/docs/streams/quickstart.html @@ -44,7 +44,8 @@ final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). -KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input"); +KStream<String, String> textLines = builder.stream("streams-plaintext-input", + Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. @@ -54,10 +55,10 @@ KTable<String, Long> wordCounts = textLines .groupBy((key, value) -> value) // Count the occurrences of each word (message key). - .count("Counts") + .count() // Store the running counts as a changelog stream to the output topic. -wordCounts.to(stringSerde, longSerde, "streams-wordcount-output"); +wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); </pre> <p> http://git-wip-us.apache.org/repos/asf/kafka/blob/93b71e7d/docs/streams/tutorial.html ---------------------------------------------------------------------- diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html index 6476d07..f800681 100644 --- a/docs/streams/tutorial.html +++ b/docs/streams/tutorial.html @@ -485,12 +485,14 @@ return value; } }) - .count("Counts"); + // Materialize the result into a KeyValueStore named "counts-store". + // The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store. + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")); </pre> <p> - Note that the <code>count</code> operator has a <code>String</code> typed parameter <code>Counts</code>, - which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic. + Note that the <code>count</code> operator has a <code>Materialized</code> parameter that specifies that the + running count should be stored in a state store named <code>counts-store</code>. This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>. </p> @@ -502,7 +504,7 @@ </p> <pre class="brush: java;"> - counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); </pre> <p> @@ -516,8 +518,9 @@ KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) - .count("Counts") - .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) + .toStream() + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); </pre> <p> @@ -589,8 +592,9 @@ KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) - .count("Counts") - .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) + .toStream() + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); http://git-wip-us.apache.org/repos/asf/kafka/blob/93b71e7d/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java index 6dafa8c..020eb03 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -17,12 +17,16 @@ package ${package}; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; @@ -59,17 +63,19 @@ public class WordCount { return value; } }) - .count("Counts") - .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) + .toStream() + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); - /* ------- use the code below for Java 8 and uncomment the above ---- + /* ------- use the code below for Java 8 and comment the above ---- - builder.stream("streams-plaintext-input") + builder.<String, String>stream("streams-plaintext-input") .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) - .count("Counts") - .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) + .toStream() + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); ----------------------------------------------------------------- */