Repository: kafka
Updated Branches:
  refs/heads/0.11.0 2df4d493e -> acc020d83


MINOR: Make streams quick start more interactive

1. Make the WordCountDemo application to not stop automatically but via 
"ctrl-C".
2. Update the quickstart html file to let users type input messages one-by-one, 
and observe added output in an interactive manner.
3. Some minor fixes on the parent documentation page pointing to streams 
sub-pages, added a new recommended Scala version number.

Author: Guozhang Wang <[email protected]>

Reviewers: Michael G. Noll <[email protected]>, Damian Guy 
<[email protected]>

Closes #3515 from guozhangwang/KMinor-interactive-quickstart

(cherry picked from commit 91c207c2c6b09d88cc3366d69a31d0bf0ab0bffb)
Signed-off-by: Guozhang Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/acc020d8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/acc020d8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/acc020d8

Branch: refs/heads/0.11.0
Commit: acc020d833dc6f6a43466881f59347b1a0c94fec
Parents: 2df4d49
Author: Guozhang Wang <[email protected]>
Authored: Tue Jul 25 11:34:16 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Tue Jul 25 11:34:25 2017 -0700

----------------------------------------------------------------------
 docs/js/templateData.js                         |   1 +
 docs/streams/quickstart.html                    | 192 ++++++++++++++-----
 docs/toc.html                                   |   9 +
 .../kafka/streams/examples/pipe/PipeDemo.java   |  25 ++-
 .../examples/wordcount/WordCountDemo.java       |  27 ++-
 .../wordcount/WordCountProcessorDemo.java       |  31 ++-
 6 files changed, 214 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/docs/js/templateData.js
----------------------------------------------------------------------
diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 3eca71e..50997bd 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -20,4 +20,5 @@ var context={
     "version": "0110",
     "dotVersion": "0.11.0",
     "fullDotVersion": "0.11.0.0"
+    "scalaVersion:" "2.11"
 };

http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/docs/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html
index 1c45e16..031a375 100644
--- a/docs/streams/quickstart.html
+++ b/docs/streams/quickstart.html
@@ -40,10 +40,10 @@ of the <code><a 
href="https://github.com/apache/kafka/blob/{{dotVersion}}/stream
 final Serde&lt;String&gt; stringSerde = Serdes.String();
 final Serde&lt;Long&gt; longSerde = Serdes.Long();
 
-// Construct a `KStream` from the input topic ""streams-file-input", where 
message values
+// Construct a `KStream` from the input topic "streams-wordcount-input", where 
message values
 // represent lines of text (for the sake of this example, we ignore whatever 
may be stored
 // in the message keys).
-KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, 
stringSerde, "streams-file-input");
+KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, 
stringSerde, "streams-wordcount-input");
 
 KTable&lt;String, Long&gt; wordCounts = textLines
     // Split each text line, by whitespace, into words.
@@ -71,16 +71,18 @@ because it cannot know when it has processed "all" the 
input data.
 <p>
   As the first step, we will start Kafka (unless you already have it started) 
and then we will
   prepare input data to a Kafka topic, which will subsequently be processed by 
a Kafka Streams application.
+</p>
 
-  <h4><a id="quickstart_streams_download" 
href="#quickstart_streams_download">Step 1: Download the code</a></h4>
+<h4><a id="quickstart_streams_download" 
href="#quickstart_streams_download">Step 1: Download the code</a></h4>
 
-<a 
href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_2.11-{{fullDotVersion}}.tgz";
 title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar 
it.
+<a 
href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz";
 title="Kafka downloads">Download</a> the {{fullDotVersion}} release and un-tar 
it.
+Note that there are multiple downloadable Scala versions and we choose to use 
the recommended version ({{scalaVersion}}) here:
 
 <pre class="brush: bash;">
-&gt; tar -xzf kafka_2.11-{{fullDotVersion}}.tgz
-&gt; cd kafka_2.11-{{fullDotVersion}}
+&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}
 </pre>
-</p>
+
 <h4><a id="quickstart_streams_startserver" 
href="#quickstart_streams_startserver">Step 2: Start the Kafka server</a></h4>
 
 <p>
@@ -102,19 +104,9 @@ Kafka uses <a 
href="https://zookeeper.apache.org/";>ZooKeeper</a> so you need to
 </pre>
 
 
-<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 
3: Prepare data</a></h4>
+<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">Step 
3: Prepare input topic and start Kafka producer</a></h4>
 
 <!--
-<pre>
-&gt; <b>./bin/kafka-topics --create \</b>
-            <b>--zookeeper localhost:2181 \</b>
-            <b>--replication-factor 1 \</b>
-            <b>--partitions 1 \</b>
-            <b>--topic streams-file-input</b>
-
-</pre>
-
--->
 
 <pre class="brush: bash;">
 &gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka 
summit" > file-input.txt
@@ -126,41 +118,59 @@ Or on Windows:
 &gt; echo|set /p=join kafka summit>> file-input.txt
 </pre>
 
-<p>
-Next, we send this input data to the input topic named 
<b>streams-file-input</b> using the console producer,
-which reads the data from STDIN line-by-line, and publishes each line as a 
separate Kafka message with null key and value encoded a string to the topic 
(in practice,
-stream data will likely be flowing continuously into Kafka where the 
application will be up and running):
-</p>
+-->
+
+Next, we create the input topic named <b>streams-wordcount-input</b> and the 
output topic named <b>streams-wordcount-output</b>:
 
 <pre class="brush: bash;">
 &gt; bin/kafka-topics.sh --create \
     --zookeeper localhost:2181 \
     --replication-factor 1 \
     --partitions 1 \
-    --topic streams-file-input
+    --topic streams-wordcount-input
+Created topic "streams-wordcount-input".
+
+&gt; bin/kafka-topics.sh --create \
+    --zookeeper localhost:2181 \
+    --replication-factor 1 \
+    --partitions 1 \
+    --topic streams-wordcount-output
+Created topic "streams-wordcount-output".
 </pre>
 
+The created topic can be described with the same <b>kafka-topics</b> tool:
 
 <pre class="brush: bash;">
-&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-file-input < file-input.txt
+&gt; bin/kafka-topics.sh --zookeeper localhost:2181 --describe
+
+Topic:streams-wordcount-input  PartitionCount:1        ReplicationFactor:1     
Configs:
+    Topic: streams-wordcount-input     Partition: 0    Leader: 0       
Replicas: 0     Isr: 0
+Topic:streams-wordcount-output PartitionCount:1        ReplicationFactor:1     
Configs:
+       Topic: streams-wordcount-output Partition: 0    Leader: 0       
Replicas: 0     Isr: 0
 </pre>
 
-<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 
4: Process data</a></h4>
+<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: 
Start the Wordcount Application</a></h4>
+
+The following command starts the WordCount demo application:
 
 <pre class="brush: bash;">
 &gt; bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountDemo
 </pre>
 
 <p>
-The demo application will read from the input topic <b>streams-file-input</b>, 
perform the computations of the WordCount algorithm on each of the read 
messages,
+The demo application will read from the input topic 
<b>streams-wordcount-input</b>, perform the computations of the WordCount 
algorithm on each of the read messages,
 and continuously write its current results to the output topic 
<b>streams-wordcount-output</b>.
 Hence there won't be any STDOUT output except log entries as the results are 
written back into in Kafka.
-The demo will run for a few seconds and then, unlike typical stream processing 
applications, terminate automatically.
-</p>
-<p>
-We can now inspect the output of the WordCount demo application by reading 
from its output topic:
 </p>
 
+Now we can start the console producer in a separate terminal to write some 
input data to this topic:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+</pre>
+
+and inspect the output of the WordCount demo application by reading from its 
output topic with the console consumer in a separate terminal:
+
 <pre class="brush: bash;">
 &gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic streams-wordcount-output \
@@ -172,27 +182,115 @@ We can now inspect the output of the WordCount demo 
application by reading from
     --property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 </pre>
 
+
+<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 
5: Process some data</a></h4>
+
+Now let's write some message with the console producer into the input topic 
<b>streams-wordcount-input</b> by entering a single line of text and then hit 
&lt;RETURN&gt;.
+This will send a new message to the input topic, where the message key is null 
and the message value is the string encoded text line that you just entered
+(in practice, input data for applications will typically be streaming 
continuously into Kafka, rather than being manually entered as we do in this 
quickstart):
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+all streams lead to kafka
+</pre>
+
 <p>
-with the following output data being printed to the console:
+This message will be processed by the Wordcount application and the following 
output data will be written to the <b>streams-wordcount-output</b> topic and 
printed by the console consumer:
 </p>
 
 <pre class="brush: bash;">
-all     1
-lead    1
-to      1
-hello   1
-streams 2
-join    1
-kafka   3
-summit  1
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all        1
+streams        1
+lead   1
+to         1
+kafka  1
 </pre>
 
 <p>
-Here, the first column is the Kafka message key in 
<code>java.lang.String</code> format, and the second column is the message 
value in <code>java.lang.Long</code> format.
-Note that the output is actually a continuous stream of updates, where each 
data record (i.e. each line in the original output above) is
-an updated count of a single word, aka record key such as "kafka". For 
multiple records with the same key, each later record is an update of the 
previous one.
+Here, the first column is the Kafka message key in 
<code>java.lang.String</code> format and represents a word that is being 
counted, and the second column is the message value in 
<code>java.lang.Long</code>format, representing the word's latest count.
 </p>
 
+Now let's continue writing one more message with the console producer into the 
input topic <b>streams-wordcount-input</b>.
+Enter the text line "hello kafka streams" and hit &lt;RETURN&gt;.
+Your terminal should look as follows:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+</pre>
+
+In your other terminal in which the console consumer is running, you will 
observe that the WordCount application wrote new output data:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all        1
+streams        1
+lead   1
+to         1
+kafka  1
+hello  1
+kafka  2
+streams        2
+</pre>
+
+Here the last printed lines <b>kafka 2</b> and <b>streams 2</b> indicate 
updates to the keys <b>kafka</b> and <b>streams</b> whose counts have been 
incremented from <b>1</b> to <b>2</b>.
+Whenever you write further input messages to the input topic, you will observe 
new messages being added to the <b>streams-wordcount-output</b> topic,
+representing the most recent word counts as computed by the WordCount 
application.
+Let's enter one final input text line "join kafka summit" and hit 
&lt;RETURN&gt; in the console producer to the input topic 
<b>streams-wordcount-input</b> before we wrap up this quickstart:
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-wordcount-input
+all streams lead to kafka
+hello kafka streams
+join kafka summit
+</pre>
+
+The <b>streams-wordcount-output</b> topic will subsequently show the 
corresponding updated word counts (see last three lines):
+
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
+
+all        1
+streams        1
+lead   1
+to         1
+kafka  1
+hello  1
+kafka  2
+streams        2
+join   1
+kafka  3
+summit 1
+</pre>
+
+As one can see, outputs of the Wordcount application is actually a continuous 
stream of updates, where each output record (i.e. each line in the original 
output above) is
+an updated count of a single word, aka record key such as "kafka". For 
multiple records with the same key, each later record is an update of the 
previous one.
+
 <p>
 The two diagrams below illustrate what is essentially happening behind the 
scenes.
 The first column shows the evolution of the current state of the 
<code>KTable&lt;String, Long&gt;</code> that is counting word occurrences for 
<code>count</code>.
@@ -217,13 +315,9 @@ And so on (we skip the illustration of how the third line 
is being processed). T
 Looking beyond the scope of this concrete example, what Kafka Streams is doing 
here is to leverage the duality between a table and a changelog stream (here: 
table = the KTable, changelog stream = the downstream KStream): you can publish 
every change of the table to a stream, and if you consume the entire changelog 
stream from beginning to end, you can reconstruct the contents of the table.
 </p>
 
-<p>
-Now you can write more input messages to the <b>streams-file-input</b> topic 
and observe additional messages added
-to <b>streams-wordcount-output</b> topic, reflecting updated word counts 
(e.g., using the console producer and the
-console consumer, as described above).
-</p>
+<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: 
Teardown the application</a></h4>
 
-<p>You can stop the console consumer via <b>Ctrl-C</b>.</p>
+<p>You can now stop the console consumer, the console producer, the Wordcount 
application, the Kafka broker and the Zookeeper server in order via 
<b>Ctrl-C</b>.</p>
 
  <div class="pagination">
         <a href="/{{version}}/documentation/streams" class="pagination__btn 
pagination__btn__prev">Previous</a>

http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/docs/toc.html
----------------------------------------------------------------------
diff --git a/docs/toc.html b/docs/toc.html
index 7525b0f..2ec0129 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -141,6 +141,15 @@
                 <li><a href="#connect_development">8.3 Connector Development 
Guide</a></li>
             </ul>
         </li>
+        <li><a href="/{{version}}/documentation/streams">9. Kafka Streams</a>
+            <ul>
+                <li><a 
href="/{{version}}/documentation/streams/quickstart">9.1 Play with a Streams 
Application</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/developer-guide">9.2 Developer 
Guide</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/core-concepts">9.3 Core 
Concepts</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/architecture">9.4 Architecture</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/upgrade-guide">9.5 Upgrade Guide and 
API Changes</a></li>
+            </ul>
+        </li>
     </ul>
 
 </script>

http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 86182a3..1d672b2 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -18,11 +18,13 @@ package org.apache.kafka.streams.examples.pipe;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Demonstrates, using the high-level KStream DSL, how to read data from a 
source (input) topic and how to
@@ -51,13 +53,24 @@ public class PipeDemo {
 
         builder.stream("streams-file-input").to("streams-pipe-output");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
 
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the 
input data is finite.
-        Thread.sleep(5000L);
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-wordcount-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
 
-        streams.close();
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            Exit.exit(1);
+        }
+        Exit.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 03f8762..616fc48 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import java.util.Arrays;
 import java.util.Locale;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Demonstrates, using the high-level KStream DSL, how to implement the 
WordCount program
@@ -60,7 +62,7 @@ public class WordCountDemo {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> source = builder.stream("streams-file-input");
+        KStream<String, String> source = 
builder.stream("streams-wordcount-input");
 
         KTable<String, Long> counts = source
                 .flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -80,13 +82,24 @@ public class WordCountDemo {
         // need to override value serde to Long type
         counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
 
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the 
input data is finite.
-        Thread.sleep(5000L);
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-wordcount-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
 
-        streams.close();
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            Exit.exit(1);
+        }
+        Exit.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/acc020d8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 4a990a6..a03a503 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KafkaStreams;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.state.Stores;
 
 import java.util.Locale;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Demonstrates, using the low-level Processor APIs, how to implement the 
WordCount program
@@ -113,20 +115,31 @@ public class WordCountProcessorDemo {
 
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.addSource("Source", "streams-file-input");
+        builder.addSource("Source", "streams-wordcount-input");
 
         builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
         
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
 "Process");
 
         builder.addSink("Sink", "streams-wordcount-processor-output", 
"Process");
 
-        KafkaStreams streams = new KafkaStreams(builder, props);
-        streams.start();
-
-        // usually the stream application would be running forever,
-        // in this example we just let it run for some time and stop since the 
input data is finite.
-        Thread.sleep(5000L);
-
-        streams.close();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-wordcount-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            Exit.exit(1);
+        }
+        Exit.exit(0);
     }
 }

Reply via email to