Repository: kafka Updated Branches: refs/heads/0.11.0 2df4d493e -> acc020d83
MINOR: Make streams quick start more interactive 1. Make the WordCountDemo application to not stop automatically but via "ctrl-C". 2. Update the quickstart html file to let users type input messages one-by-one, and observe added output in an interactive manner. 3. Some minor fixes on the parent documentation page pointing to streams sub-pages, added a new recommended Scala version number. Author: Guozhang Wang <[email protected]> Reviewers: Michael G. Noll <[email protected]>, Damian Guy <[email protected]> Closes #3515 from guozhangwang/KMinor-interactive-quickstart (cherry picked from commit 91c207c2c6b09d88cc3366d69a31d0bf0ab0bffb) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/acc020d8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/acc020d8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/acc020d8 Branch: refs/heads/0.11.0 Commit: acc020d833dc6f6a43466881f59347b1a0c94fec Parents: 2df4d49 Author: Guozhang Wang <[email protected]> Authored: Tue Jul 25 11:34:16 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Jul 25 11:34:25 2017 -0700 ---------------------------------------------------------------------- docs/js/templateData.js | 1 + docs/streams/quickstart.html | 192 ++++++++++++++----- docs/toc.html | 9 + .../kafka/streams/examples/pipe/PipeDemo.java | 25 ++- .../examples/wordcount/WordCountDemo.java | 27 ++- .../wordcount/WordCountProcessorDemo.java | 31 ++- 6 files changed, 214 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/docs/js/templateData.js ---------------------------------------------------------------------- diff --git a/docs/js/templateData.js b/docs/js/templateData.js index 3eca71e..50997bd 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -20,4 +20,5 @@ var context={ "version": "0110", "dotVersion": "0.11.0", "fullDotVersion": "0.11.0.0" + "scalaVersion:" "2.11" }; http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/docs/streams/quickstart.html ---------------------------------------------------------------------- diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html index 1c45e16..031a375 100644 --- a/docs/streams/quickstart.html +++ b/docs/streams/quickstart.html @@ -40,10 +40,10 @@ of the <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); -// Construct a `KStream` from the input topic ""streams-file-input", where message values +// Construct a `KStream` from the input topic "streams-wordcount-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). -KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input"); +KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-wordcount-input"); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. @@ -71,16 +71,18 @@ because it cannot know when it has processed "all" the input data. <p> As the first step, we will start Kafka (unless you already have it started) and then we will prepare input data to a Kafka topic, which will subsequently be processed by a Kafka Streams application. +</p> - <h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step 1: Download the code</a></h4> +<h4><a id="quickstart_streams_download" href="#quickstart_streams_download">Step 1: Download the code</a></h4> -<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_2.11-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it. +<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar it. +Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here: <pre class="brush: bash;"> -> tar -xzf kafka_2.11-{{fullDotVersion}}.tgz -> cd kafka_2.11-{{fullDotVersion}} +> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz +> cd kafka_{{scalaVersion}}-{{fullDotVersion}} </pre> -</p> + <h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4> <p> @@ -102,19 +104,9 @@ Kafka uses <a href="https://zookeeper.apache.org/">ZooKeeper</a> so you need to </pre> -<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare data</a></h4> +<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 3: Prepare input topic and start Kafka producer</a></h4> <!-- -<pre> -> <b>./bin/kafka-topics --create \</b> - <b>--zookeeper localhost:2181 \</b> - <b>--replication-factor 1 \</b> - <b>--partitions 1 \</b> - <b>--topic streams-file-input</b> - -</pre> - ---> <pre class="brush: bash;"> > echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt @@ -126,41 +118,59 @@ Or on Windows: > echo|set /p=join kafka summit>> file-input.txt </pre> -<p> -Next, we send this input data to the input topic named <b>streams-file-input</b> using the console producer, -which reads the data from STDIN line-by-line, and publishes each line as a separate Kafka message with null key and value encoded a string to the topic (in practice, -stream data will likely be flowing continuously into Kafka where the application will be up and running): -</p> +--> + +Next, we create the input topic named <b>streams-wordcount-input</b> and the output topic named <b>streams-wordcount-output</b>: <pre class="brush: bash;"> > bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ - --topic streams-file-input + --topic streams-wordcount-input +Created topic "streams-wordcount-input". + +> bin/kafka-topics.sh --create \ + --zookeeper localhost:2181 \ + --replication-factor 1 \ + --partitions 1 \ + --topic streams-wordcount-output +Created topic "streams-wordcount-output". </pre> +The created topic can be described with the same <b>kafka-topics</b> tool: <pre class="brush: bash;"> -> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt +> bin/kafka-topics.sh --zookeeper localhost:2181 --describe + +Topic:streams-wordcount-input PartitionCount:1 ReplicationFactor:1 Configs: + Topic: streams-wordcount-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0 +Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs: + Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0 </pre> -<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 4: Process data</a></h4> +<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: Start the Wordcount Application</a></h4> + +The following command starts the WordCount demo application: <pre class="brush: bash;"> > bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo </pre> <p> -The demo application will read from the input topic <b>streams-file-input</b>, perform the computations of the WordCount algorithm on each of the read messages, +The demo application will read from the input topic <b>streams-wordcount-input</b>, perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic <b>streams-wordcount-output</b>. Hence there won't be any STDOUT output except log entries as the results are written back into in Kafka. -The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically. -</p> -<p> -We can now inspect the output of the WordCount demo application by reading from its output topic: </p> +Now we can start the console producer in a separate terminal to write some input data to this topic: + +<pre class="brush: bash;"> +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input +</pre> + +and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal: + <pre class="brush: bash;"> > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ @@ -172,27 +182,115 @@ We can now inspect the output of the WordCount demo application by reading from --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer </pre> + +<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5: Process some data</a></h4> + +Now let's write some message with the console producer into the input topic <b>streams-wordcount-input</b> by entering a single line of text and then hit <RETURN>. +This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered +(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart): + +<pre class="brush: bash;"> +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input +all streams lead to kafka +</pre> + <p> -with the following output data being printed to the console: +This message will be processed by the Wordcount application and the following output data will be written to the <b>streams-wordcount-output</b> topic and printed by the console consumer: </p> <pre class="brush: bash;"> -all 1 -lead 1 -to 1 -hello 1 -streams 2 -join 1 -kafka 3 -summit 1 +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 + --topic streams-wordcount-output \ + --from-beginning \ + --formatter kafka.tools.DefaultMessageFormatter \ + --property print.key=true \ + --property print.value=true \ + --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ + --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer + +all 1 +streams 1 +lead 1 +to 1 +kafka 1 </pre> <p> -Here, the first column is the Kafka message key in <code>java.lang.String</code> format, and the second column is the message value in <code>java.lang.Long</code> format. -Note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is -an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one. +Here, the first column is the Kafka message key in <code>java.lang.String</code> format and represents a word that is being counted, and the second column is the message value in <code>java.lang.Long</code>format, representing the word's latest count. </p> +Now let's continue writing one more message with the console producer into the input topic <b>streams-wordcount-input</b>. +Enter the text line "hello kafka streams" and hit <RETURN>. +Your terminal should look as follows: + +<pre class="brush: bash;"> +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input +all streams lead to kafka +hello kafka streams +</pre> + +In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data: + +<pre class="brush: bash;"> +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 + --topic streams-wordcount-output \ + --from-beginning \ + --formatter kafka.tools.DefaultMessageFormatter \ + --property print.key=true \ + --property print.value=true \ + --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ + --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer + +all 1 +streams 1 +lead 1 +to 1 +kafka 1 +hello 1 +kafka 2 +streams 2 +</pre> + +Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been incremented from <b>1</b> to <b>2</b>. +Whenever you write further input messages to the input topic, you will observe new messages being added to the <b>streams-wordcount-output</b> topic, +representing the most recent word counts as computed by the WordCount application. +Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic <b>streams-wordcount-input</b> before we wrap up this quickstart: + +<pre class="brush: bash;"> +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input +all streams lead to kafka +hello kafka streams +join kafka summit +</pre> + +The <b>streams-wordcount-output</b> topic will subsequently show the corresponding updated word counts (see last three lines): + +<pre class="brush: bash;"> +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 + --topic streams-wordcount-output \ + --from-beginning \ + --formatter kafka.tools.DefaultMessageFormatter \ + --property print.key=true \ + --property print.value=true \ + --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ + --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer + +all 1 +streams 1 +lead 1 +to 1 +kafka 1 +hello 1 +kafka 2 +streams 2 +join 1 +kafka 3 +summit 1 +</pre> + +As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is +an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one. + <p> The two diagrams below illustrate what is essentially happening behind the scenes. The first column shows the evolution of the current state of the <code>KTable<String, Long></code> that is counting word occurrences for <code>count</code>. @@ -217,13 +315,9 @@ And so on (we skip the illustration of how the third line is being processed). T Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table. </p> -<p> -Now you can write more input messages to the <b>streams-file-input</b> topic and observe additional messages added -to <b>streams-wordcount-output</b> topic, reflecting updated word counts (e.g., using the console producer and the -console consumer, as described above). -</p> +<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown the application</a></h4> -<p>You can stop the console consumer via <b>Ctrl-C</b>.</p> +<p>You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the Zookeeper server in order via <b>Ctrl-C</b>.</p> <div class="pagination"> <a href="/{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a> http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/docs/toc.html ---------------------------------------------------------------------- diff --git a/docs/toc.html b/docs/toc.html index 7525b0f..2ec0129 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -141,6 +141,15 @@ <li><a href="#connect_development">8.3 Connector Development Guide</a></li> </ul> </li> + <li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a> + <ul> + <li><a href="/{{version}}/documentation/streams/quickstart">9.1 Play with a Streams Application</a></li> + <li><a href="/{{version}}/documentation/streams/developer-guide">9.2 Developer Guide</a></li> + <li><a href="/{{version}}/documentation/streams/core-concepts">9.3 Core Concepts</a></li> + <li><a href="/{{version}}/documentation/streams/architecture">9.4 Architecture</a></li> + <li><a href="/{{version}}/documentation/streams/upgrade-guide">9.5 Upgrade Guide and API Changes</a></li> + </ul> + </li> </ul> </script> http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java index 86182a3..1d672b2 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -18,11 +18,13 @@ package org.apache.kafka.streams.examples.pipe; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; +import java.util.concurrent.CountDownLatch; /** * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to @@ -51,13 +53,24 @@ public class PipeDemo { builder.stream("streams-file-input").to("streams-pipe-output"); - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); + final KafkaStreams streams = new KafkaStreams(builder, props); + final CountDownLatch latch = new CountDownLatch(1); - // usually the stream application would be running forever, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); - streams.close(); + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + Exit.exit(1); + } + Exit.exit(0); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 03f8762..616fc48 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -30,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper; import java.util.Arrays; import java.util.Locale; import java.util.Properties; +import java.util.concurrent.CountDownLatch; /** * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program @@ -60,7 +62,7 @@ public class WordCountDemo { KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> source = builder.stream("streams-file-input"); + KStream<String, String> source = builder.stream("streams-wordcount-input"); KTable<String, Long> counts = source .flatMapValues(new ValueMapper<String, Iterable<String>>() { @@ -80,13 +82,24 @@ public class WordCountDemo { // need to override value serde to Long type counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output"); - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); + final KafkaStreams streams = new KafkaStreams(builder, props); + final CountDownLatch latch = new CountDownLatch(1); - // usually the stream application would be running forever, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); - streams.close(); + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + Exit.exit(1); + } + Exit.exit(0); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 4a990a6..a03a503 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.KafkaStreams; @@ -31,6 +32,7 @@ import org.apache.kafka.streams.state.Stores; import java.util.Locale; import java.util.Properties; +import java.util.concurrent.CountDownLatch; /** * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program @@ -113,20 +115,31 @@ public class WordCountProcessorDemo { TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("Source", "streams-file-input"); + builder.addSource("Source", "streams-wordcount-input"); builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the stream application would be running forever, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); + final KafkaStreams streams = new KafkaStreams(builder, props); + final CountDownLatch latch = new CountDownLatch(1); + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { + @Override + public void run() { + streams.close(); + latch.countDown(); + } + }); + + try { + streams.start(); + latch.await(); + } catch (Throwable e) { + Exit.exit(1); + } + Exit.exit(0); } }
