Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ec2faa179 -> 97f5525ca


MINOR: add IQ docs to streams documentation

Author: Damian Guy <[email protected]>

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>

Closes #3484 from dguy/iq-doc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97f5525c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97f5525c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97f5525c

Branch: refs/heads/0.11.0
Commit: 97f5525cabbe29b17577e771e05d3daa5d6c53cd
Parents: ec2faa1
Author: Damian Guy <[email protected]>
Authored: Wed Jul 5 14:15:43 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed Jul 5 20:12:47 2017 -0700

----------------------------------------------------------------------
 docs/images/streams-interactive-queries-01.png  | Bin 0 -> 80976 bytes
 docs/images/streams-interactive-queries-02.png  | Bin 0 -> 73218 bytes
 docs/images/streams-interactive-queries-03.png  | Bin 0 -> 79879 bytes
 .../streams-interactive-queries-api-01.png      | Bin 0 -> 84438 bytes
 .../streams-interactive-queries-api-02.png      | Bin 0 -> 100725 bytes
 docs/streams/developer-guide.html               | 518 +++++++++++++++++++
 docs/streams/index.html                         |   1 +
 7 files changed, 519 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/images/streams-interactive-queries-01.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-interactive-queries-01.png 
b/docs/images/streams-interactive-queries-01.png
new file mode 100644
index 0000000..d5d5031
Binary files /dev/null and b/docs/images/streams-interactive-queries-01.png 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/images/streams-interactive-queries-02.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-interactive-queries-02.png 
b/docs/images/streams-interactive-queries-02.png
new file mode 100644
index 0000000..ea894b6
Binary files /dev/null and b/docs/images/streams-interactive-queries-02.png 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/images/streams-interactive-queries-03.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-interactive-queries-03.png 
b/docs/images/streams-interactive-queries-03.png
new file mode 100644
index 0000000..403e3ae
Binary files /dev/null and b/docs/images/streams-interactive-queries-03.png 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/images/streams-interactive-queries-api-01.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-interactive-queries-api-01.png 
b/docs/images/streams-interactive-queries-api-01.png
new file mode 100644
index 0000000..2b4aaed
Binary files /dev/null and b/docs/images/streams-interactive-queries-api-01.png 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/images/streams-interactive-queries-api-02.png
----------------------------------------------------------------------
diff --git a/docs/images/streams-interactive-queries-api-02.png 
b/docs/images/streams-interactive-queries-api-02.png
new file mode 100644
index 0000000..e5e7527
Binary files /dev/null and b/docs/images/streams-interactive-queries-api-02.png 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html 
b/docs/streams/developer-guide.html
index f6a8e6a..6c75172 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -406,6 +406,524 @@
     </pre>
     <br>
 
+    <h3><a id="streams_interactive_queries" 
href="#streams_interactive_queries">Interactive Queries</a></h3>
+    <p>
+        Interactive queries let you get more from streaming than just the 
processing of data. This feature allows you to treat the stream processing 
layer as a lightweight embedded database and, more concretely, <i>to directly 
query the latest state</i> of your stream processing application, without 
needing to materialize that state to external databases or external storage 
first.
+        As a result, interactive queries simplify the architecture of many use 
cases and lead to more application-centric architectures.  For example, you 
often no longer need to operate and interface with a separate database cluster 
-- or a separate infrastructure team in your company that runs that cluster -- 
to share data between a Kafka Streams application (say, an event-driven 
microservice) and downstream applications, regardless of whether these 
applications use Kafka Streams or not; they may even be applications that do 
not run on the JVM, e.g. implemented in Python, C/C++, or JavaScript.
+        The following diagrams juxtapose two architectures:  the first does 
not use interactive queries whereas the second architecture does.  It depends 
on the concrete use case to determine which of these architectures is a better 
fit -- the important takeaway is that Kafka Streams and interactive queries 
give you the flexibility to pick and to compose the right one, rather than 
limiting you to just a single way.
+    </p>
+
+
+    <figure>
+        <img class="centerd" 
src="/{{version}}/images/streams-interactive-queries-01.png" 
style="width:600pt;">
+        <figcaption style="text-align: center;"><i>Without interactive 
queries: increased complexity and heavier footprint of 
architecture</i></figcaption>
+    </figure>
+
+
+    <figure>
+        <img class="centered" 
src="/{{version}}/images/streams-interactive-queries-02.png" 
style="width:500pt;">
+        <figcaption style="text-align: center;"><i>With interactive queries: 
simplified, more application-centric architecture</i></figcaption>
+    </figure>
+
+    <p>
+        Here are some use case examples for applications that benefit from 
interactive queries:
+    </p>
+    <ul>
+        <li>Real-time monitoring:  A front-end dashboard that provides threat 
intelligence (e.g., web servers currently
+            under attack by cyber criminals) can directly query a Kafka 
Streams application that continuously generates the
+            relevant information by processing network telemetry data in 
real-time.
+        </li>
+        <li>Video gaming:  A Kafka Streams application continuously tracks 
location updates from players in the gaming universe.
+            A mobile companion app can then directly query the Kafka Streams 
application to show the current location of a player
+            to friends and family, and invite them to come along.  Similarly, 
the game vendor can use the data to identify unusual
+            hotspots of players, which may indicate a bug or an operational 
issue.
+        </li>
+        <li>Risk and fraud:  A Kafka Streams application continuously analyzes 
user transactions for anomalies and suspicious
+            behavior.  An online banking application can directly query the 
Kafka Streams application when a user logs in to deny
+            access to those users that have been flagged as suspicious.
+        </li>
+        <li>Trend detection:  A Kafka Streams application continuously 
computes the latest top charts across music genres based on
+            user listening behavior that is collected in real-time.  Mobile or 
desktop applications of a music store can then
+            interactively query for the latest charts while users are browsing 
the store.
+        </li>
+    </ul>
+
+    <h4><a id="treams_developer-guide_interactive-queries_your_app" 
href="#treams_developer-guide_interactive-queries_your_app">Your application 
and interactive queries</a></h4>
+    <p>
+        Interactive queries allow you to tap into the <i>state</i> of your 
application, and notably to do that from outside your application.
+        However, an application is not interactively queryable out of the box: 
you make it queryable by leveraging the API of Kafka Streams.
+    </p>
+
+    <p>
+        It is important to understand that the state of your application -- to 
be extra clear, we might call it "the full state of the entire application" -- 
is typically split across many distributed instances of your application, and 
thus across many state stores that are managed locally by these application 
instances.
+    </p>
+
+    <img class="centered" 
src="/{{version}}/images/streams-interactive-queries-03.png" 
style="width:400pt; height:400pt;">
+
+    <p>
+        Accordingly, the API to let you interactively query your application's 
state has two parts, a <i>local</i> and a <i>remote</i> one:
+    </p>
+
+    <ol>
+        <li><a 
href="#streams_developer-guide_interactive-queries_local-stores">Querying local 
state stores (for an application instance)</a>:  You can query that (part of 
the full) state that is managed locally by an instance of your application.  
Here, an application instance can directly query its own local state stores.  
You can thus use the corresponding (local) data in other parts of your 
application code that are not related to calling the Kafka Streams API.  
Querying state stores is always *read-only* to guarantee that the underlying 
state stores   will never be mutated out-of-band, e.g. you cannot add new 
entries; state stores should only ever be mutated by the corresponding 
processor topology and the input data it operates on.
+        </li>
+        <li><a 
href="#streams_developer-guide_interactive-queries_discovery">Querying remote 
state stores (for the entire application)</a>:  To query the full state of your 
entire application we must be able to piece together the various local 
fragments of the state.  In addition to being able to (a) query local state 
stores as described in the previous bullet point, we also need to (b) discover 
all the running instances of your application in the network, including their 
respective state stores and (c) have a way to communicate with these instances 
over the network, i.e. an RPC layer.  Collectively, these building blocks 
enable intra-app communcation (between instances of the same app) as well as 
inter-app communication (from other applications) for interactive queries.
+        </li>
+    </ol>
+
+    <table class="data-table">
+        <tbody>
+        <tr>
+            <th>What of the below is required to access the state of ...</th>
+            <th>... an app instance (local state)</th>
+            <th>... the entire application (full state)</th>
+        </tr>
+        <tr>
+            <td>Query local state stores of an app instance</td><td>Required 
(but already built-in)</td><td>Required (but already built-in)</td>
+        </tr>
+        <tr>
+            <td>Make an app instance discoverable to others</td><td>Not 
needed</td><td>Required (but already built-in)</td>
+        </tr>
+        <tr>
+            <td>Discover all running app instances and their state 
stores</td><td>Not needed</td><td>Required (but already built-in)</td>
+        </tr>
+        <tr>
+            <td>Communicate with app instances over the network 
(RPC)</td><td>Not needed</td><td>Required <b>user must provide</b></td>
+        </tr>
+        </tbody>
+    </table>
+
+    <p>
+        Kafka Streams provides all the required functionality for 
interactively querying your application's state out of the box, with but one 
exception:  if you want to expose your application's full state via interactive 
queries, then --
+        for reasons we explain further down below -- it is your responsibility 
to add an appropriate RPC layer (such as a REST
+        API) to your application that allows application instances to 
communicate over the network.  If, however, you only need
+        to let your application instances access their own local state, then 
you do not need to add such an RPC layer at all.
+    </p>
+
+    <h4><a id="streams_developer-guide_interactive-queries_local-stores" 
href="#streams_developer-guide_interactive-queries_local-stores">Querying local 
state stores (for an application instance)</a></h4>
+    <p>
+        A Kafka Streams application is typically running on many instances.
+        The state that is locally available on any given instance is only a 
subset of the application's entire state.
+        Querying the local stores on an instance will, by definition, <i>only 
return data locally available on that particular instance</i>.
+        We explain how to access data in state stores that are not locally 
available in section <a 
href="#streams_developer-guide_interactive-queries_discovery">Querying remote 
state stores (for the entire application)</a>.
+    </p>
+
+    <p>
+        The method <code>KafkaStreams#store(...)</code> finds an application 
instance's local state stores <i>by name</i> and <i>by type</i>.
+    </p>
+
+    <figure>
+        <img class="centerd" 
src="/{{version}}/images/streams-interactive-queries-api-01.png" 
style="width:500pt;">
+        <figcaption style="text-align: center;"><i>Every application instance 
can directly query any of its local state stores</i></figcaption>
+    </figure>
+
+    <p>
+        The <i>name</i> of a state store is defined when you are creating the 
store, either when creating the store explicitly (e.g. when using the Processor 
API) or when creating the store implicitly (e.g. when using stateful operations 
in the DSL).
+        We show examples of how to name a state store further down below.
+    </p>
+
+    <p>
+        The <i>type</i> of a state store is defined by 
<code>QueryableStoreType</code>, and you can access the built-in types via the 
class <code>QueryableStoreTypes</code>.
+        Kafka Streams currently has two built-in types:
+    </p>
+    <ul>
+        <li>A key-value store 
<code>QueryableStoreTypes#keyValueStore()</code>, see <a 
href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying
 local key-value stores</a>.</li>
+        <li>A window store <code>QueryableStoreTypes#windowStore()</code>, see 
<a 
href="#streams_developer-guide_interactive-queries_local-window-stores">Querying
 local window stores</a>.</li>
+    </ul>
+
+    <p>
+        Both store types return <i>read-only</i> versions of the underlying 
state stores.
+        This read-only constraint is important to guarantee that the 
underlying state stores will never be mutated (e.g. new entries added) 
out-of-band, i.e. only the corresponding processing topology of Kafka Streams 
is allowed to mutate and update the state stores in order to ensure data 
consistency.
+    </p>
+    <p>
+        You can also implement your own <code>QueryableStoreType</code> as 
described in section <a 
href="#streams_developer-guide_interactive-queries_custom-stores#">Querying 
local custom stores</a>
+    </p>
+
+    <p>
+        Kafka Streams materializes one state store per stream partition, which 
means your application will potentially manage many underlying state stores.
+        The API to query local state stores enables you to query all of the 
underlying stores without having to know which partition the data is in.
+        The objects returned from <code>KafkaStreams#store(...)</code> are 
therefore wrapping potentially many underlying state stores.
+    </p>
+
+    <h4><a 
id="streams_developer-guide_interactive-queries_local-key-value-stores" 
href="#streams_developer-guide_interactive-queries_local-key-value-stores">Querying
 local key-value stores</a></h4>
+    <p>
+        To query a local key-value store, you must first create a topology 
with a key-value store:
+    </p>
+
+    <pre class="brush: java;">
+          StreamsConfig config = ...;
+          KStreamBuilder builder = ...;
+          KStream&lt;String, String&gt; textLines = ...;
+
+          // Define the processing topology (here: WordCount)
+          KGroupedStream&lt;String, String&gt; groupedByWord = textLines
+            .flatMapValues(value -> 
Arrays.asList(value.toLowerCase().split("\\W+")))
+            .groupBy((key, word) -> word, stringSerde, stringSerde);
+
+          // Create a key-value store named "CountsKeyValueStore" for the 
all-time word counts
+          groupedByWord.count("CountsKeyValueStore");
+
+          // Start an instance of the topology
+          KafkaStreams streams = new KafkaStreams(builder, config);
+          streams.start();
+        </pre>
+
+    <p>
+        Above we created a key-value store named "CountsKeyValueStore".
+        This store will hold the latest count for any word that is found on 
the topic "word-count-input".
+        Once the application has started we can get access to 
"CountsKeyValueStore" and then query it via the 
<code>ReadOnlyKeyValueStore</code> API:
+    </p>
+
+    <pre class="brush: java;">
+          // Get the key-value store CountsKeyValueStore
+          ReadOnlyKeyValueStore&lt;String, Long&gt; keyValueStore =
+              streams.store("CountsKeyValueStore", 
QueryableStoreTypes.keyValueStore());
+
+          // Get value by key
+          System.out.println("count for hello:" + keyValueStore.get("hello"));
+
+          // Get the values for a range of keys available in this application 
instance
+          KeyValueIterator&lt;String, Long&gt; range = 
keyValueStore.range("all", "streams");
+          while (range.hasNext()) {
+            KeyValue&lt;String, Long&gt; next = range.next();
+            System.out.println("count for " + next.key + ": " + value);
+          }
+
+          // Get the values for all of the keys available in this application 
instance
+          KeyValueIterator&lt;String, Long&gt; range = keyValueStore.all();
+          while (range.hasNext()) {
+            KeyValue&lt;String, Long&gt; next = range.next();
+            System.out.println("count for " + next.key + ": " + value);
+          }
+        </pre>
+
+    <h4><a 
id="streams_developer-guide_interactive-queries_local-window-stores" 
href="#streams_developer-guide_interactive-queries_local-window-stores">Querying
 local window stores</a></h4>
+    <p>
+        A window store differs from a key-value store in that you will 
potentially have many results for any given key because the key can be present 
in multiple windows.
+        However, there will ever be at most one result per window for a given 
key.
+    </p>
+    <p>
+        To query a local window store, you must first create a topology with a 
window store:
+    </p>
+
+    <pre class="brush: java;">
+          StreamsConfig config = ...;
+          KStreamBuilder builder = ...;
+          KStream&lt;String, String&gt; textLines = ...;
+
+          // Define the processing topology (here: WordCount)
+          KGroupedStream&lt;String, String&gt; groupedByWord = textLines
+            .flatMapValues(value -> 
Arrays.asList(value.toLowerCase().split("\\W+")))
+            .groupBy((key, word) -> word, stringSerde, stringSerde);
+
+          // Create a window state store named "CountsWindowStore" that 
contains the word counts for every minute
+          groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
+        </pre>
+
+    <p>
+        Above we created a window store named "CountsWindowStore" that 
contains the counts for words in 1-minute windows.
+        Once the application has started we can get access to 
"CountsWindowStore" and then query it via the <code>ReadOnlyWindowStore</code> 
API:
+    </p>
+
+    <pre class="brush: java;">
+          // Get the window store named "CountsWindowStore"
+          ReadOnlyWindowStore&lt;String, Long&gt; windowStore =
+              streams.store("CountsWindowStore", 
QueryableStoreTypes.windowStore());
+
+          // Fetch values for the key "world" for all of the windows available 
in this application instance.
+          // To get *all* available windows we fetch windows from the 
beginning of time until now.
+          long timeFrom = 0; // beginning of time = oldest available
+          long timeTo = System.currentTimeMillis(); // now (in processing-time)
+          WindowStoreIterator&lt;Long&gt; iterator = 
windowStore.fetch("world", timeFrom, timeTo);
+          while (iterator.hasNext()) {
+            KeyValue&lt;Long, Long&gt; next = iterator.next();
+            long windowTimestamp = next.key;
+            System.out.println("Count of 'world' @ time " + windowTimestamp + 
" is " + next.value);
+          }
+        </pre>
+
+    <h4><a id="streams_developer-guide_interactive-queries_custom-stores" 
href="#streams_developer-guide_interactive-queries_custom-stores">Querying 
local custom state stores</a></h4>
+    <p>
+        Any custom state stores you use in your Kafka Streams applications can 
also be queried.
+        However there are some interfaces that will need to be implemented 
first:
+    </p>
+
+    <ol>
+        <li>Your custom state store must implement 
<code>StateStore</code>.</li>
+        <li>You should have an interface to represent the operations available 
on the store.</li>
+        <li>It is recommended that you also provide an interface that 
restricts access to read-only operations so users of this API can't mutate the 
state of your running Kafka Streams application out-of-band.</li>
+        <li>You also need to provide an implementation of 
<code>StateStoreSupplier</code> for creating instances of your store.</li>
+    </ol>
+
+    <p>
+        The class/interface hierarchy for your custom store might look 
something like:
+    </p>
+
+    <pre class="brush: java;">
+          public class MyCustomStore&lt;K,V&gt; implements StateStore, 
MyWriteableCustomStore&lt;K,V&gt; {
+            // implementation of the actual store
+          }
+
+          // Read-write interface for MyCustomStore
+          public interface MyWriteableCustomStore&lt;K,V&gt; extends 
MyReadableCustomStore&lt;K,V&gt; {
+            void write(K Key, V value);
+          }
+
+          // Read-only interface for MyCustomStore
+          public interface MyReadableCustomStore&lt;K,V&gt; {
+            V read(K key);
+          }
+
+          public class MyCustomStoreSupplier implements StateStoreSupplier {
+            // implementation of the supplier for MyCustomStore
+          }
+        </pre>
+
+    <p>
+        To make this store queryable you need to:
+    </p>
+    <ul>
+        <li>Provide an implementation of <code>QueryableStoreType</code>.</li>
+        <li>Provide a wrapper class that will have access to all of the 
underlying instances of the store and will be used for querying.</li>
+    </ul>
+
+    <p>
+        Implementing <code>QueryableStoreType</code> is straight forward:
+    </p>
+
+    <pre class="brush: java;">
+
+          public class MyCustomStoreType&lt;K,V&gt; implements 
QueryableStoreType&lt;MyReadableCustomStore&lt;K,V&gt;&gt; {
+
+            // Only accept StateStores that are of type MyCustomStore
+            public boolean accepts(final StateStore stateStore) {
+              return stateStore instanceOf MyCustomStore;
+            }
+
+            public MyReadableCustomStore&lt;K,V&gt; create(final 
StateStoreProvider storeProvider, final String storeName) {
+                return new MyCustomStoreTypeWrapper(storeProvider, storeName, 
this);
+            }
+
+          }
+        </pre>
+
+    <p>
+        A wrapper class is required because even a single instance of a Kafka 
Streams application may run multiple stream tasks and, by doing so, manage 
multiple local instances of a particular state store.
+        The wrapper class hides this complexity and lets you query a "logical" 
state store with a particular name without having to know about all of the 
underlying local instances of that state store.
+    </p>
+
+    <p>
+        When implementing your wrapper class you will need to make use of the 
<code>StateStoreProvider</code>
+        interface to get access to the underlying instances of your store.
+        <code>StateStoreProvider#stores(String storeName, 
QueryableStoreType&lt;T&gt; queryableStoreType)</code> returns a 
<code>List</code> of state stores with the given <code>storeName</code> and of 
the type as defined by <code>queryableStoreType</code>.
+    </p>
+    <p>
+        An example implemention of the wrapper follows (Java 8+):
+    </p>
+
+    <pre class="brush: java;">
+          // We strongly recommended implementing a read-only interface
+          // to restrict usage of the store to safe read operations!
+          public class MyCustomStoreTypeWrapper&lt;K,V&gt; implements 
MyReadableCustomStore&lt;K,V&gt; {
+
+            private final QueryableStoreType&lt;MyReadableCustomStore&lt;K, 
V&gt;&gt; customStoreType;
+            private final String storeName;
+            private final StateStoreProvider provider;
+
+            public CustomStoreTypeWrapper(final StateStoreProvider provider,
+                                          final String storeName,
+                                          final 
QueryableStoreType&lt;MyReadableCustomStore&lt;K, V&gt;&gt; customStoreType) {
+
+              // ... assign fields ...
+            }
+
+            // Implement a safe read method
+            @Override
+            public V read(final K key) {
+              // Get all the stores with storeName and of customStoreType
+              final List&lt;MyReadableCustomStore&lt;K, V&gt;&gt; stores = 
provider.getStores(storeName, customStoreType);
+              // Try and find the value for the given key
+              final Optional&lt;V&gt; value = stores.stream().filter(store -> 
store.read(key) != null).findFirst();
+              // Return the value if it exists
+              return value.orElse(null);
+            }
+          }
+        </pre>
+
+    <p>
+        Putting it all together you can now find and query your custom store:
+    </p>
+
+    <pre class="brush: java;">
+          StreamsConfig config = ...;
+          TopologyBuilder builder = ...;
+          ProcessorSupplier processorSuppler = ...;
+
+          // Create CustomStoreSupplier for store name the-custom-store
+          MyCustomStoreSuppler customStoreSupplier = new 
MyCustomStoreSupplier("the-custom-store");
+          // Add the source topic
+          builder.addSource("input", "inputTopic");
+          // Add a custom processor that reads from the source topic
+          builder.addProcessor("the-processor", processorSupplier, "input");
+          // Connect your custom state store to the custom processor above
+          builder.addStateStore(customStoreSupplier, "the-processor");
+
+          KafkaStreams streams = new KafkaStreams(builder, config);
+          streams.start();
+
+          // Get access to the custom store
+          MyReadableCustomStore&lt;String,String&gt; store = 
streams.store("the-custom-store", new MyCustomStoreType&lt;String,String&gt;());
+          // Query the store
+          String value = store.read("key");
+        </pre>
+
+    <h4><a id="streams_developer-guide_interactive-queries_discovery" 
href="#streams_developer-guide_interactive-queries_discovery">Querying remote 
state stores (for the entire application)</a></h4>
+
+    <p>
+        Typically, the ultimate goal for interactive queries is not to just 
query locally available state stores from within an instance of a Kafka Streams 
application as described in the previous section.
+        Rather, you want to expose the application's full state (i.e. the 
state across all its instances) to other applications that might be running on 
different machines.
+        For example, you might have a Kafka Streams application that processes 
the user events in a multi-player video game, and you want to retrieve the 
latest status of each user directly from this application so that you can 
display it in a mobile companion app.
+    </p>
+    <p>
+        Three steps are needed to make the full state of your application 
queryable:
+    </p>
+
+    <ol>
+        <li>You must <a 
href="#streams_developer-guide_interactive-queries_rpc-layer">add an RPC layer 
to your application</a> so that the instances of your application may be 
interacted with via the network -- notably to respond to interactive queries.
+            By design Kafka Streams does not provide any such RPC 
functionality out of the box so that you can freely pick your favorite 
approach: a REST API, Thrift, a custom protocol, and so on.</li>
+        <li>You need to <a 
href="#streams_developer-guide_interactive-queries_expose-rpc">expose the 
respective RPC endpoints</a> of your application's instances via the 
<code>application.server</code> configuration setting of Kafka Streams.
+            Because RPC endpoints must be unique within a network, each 
instance will have its own value for this configuration setting.
+            This makes an application instance discoverable by other 
instances.</li>
+        <li> In the RPC layer, you can then <a 
href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">discover
 remote application instances</a> and their respective state stores (e.g. for 
forwarding queries to other app instances if an instance lacks the local data 
to respond to a query) as well as <a 
href="#streams_developer-guide_interactive-queries_local-stores">query locally 
available state stores</a> (in order to directly respond to queries) in order 
to make    the full state of your application queryable.</li>
+    </ol>
+
+    <figure>
+        <img class="centered" 
src="/{{version}}/images/streams-interactive-queries-api-02.png" 
style="width:500pt;">
+        <figcaption style="text-align: center;"><i>Discover any running 
instances of the same application as well as the respective RPC endpoints they 
expose for interactive queries</i></figcaption>
+    </figure>
+
+    <h4><a id="streams_developer-guide_interactive-queries_rpc-layer" 
href="#streams_developer-guide_interactive-queries_rpc-layer">Adding an RPC 
layer to your application</a></h4>
+    <p>
+        As Kafka Streams doesn't provide an RPC layer you are free to choose 
your favorite approach.
+        There are many ways of doing this, and it will depend on the 
technologies you have chosen to use.
+        The only requirements are that the RPC layer is embedded within the 
Kafka Streams application and that it exposes an endpoint that other 
application instances and applications can connect to.
+    </p>
+
+    <h4><a id="streams_developer-guide_interactive-queries_expose-rpc" 
href="#streams_developer-guide_interactive-queries_expose-rpc">Exposing the RPC 
endpoints of your application</a></h4>
+    <p>
+        To enable the remote discovery of state stores running within a 
(typically distributed) Kafka Streams application you need to set the 
<code>application.server</code> configuration property in 
<code>StreamsConfig</code>.
+        The <code>application.server</code> property defines a unique 
<code>host:port</code> pair that points to the RPC endpoint of the respective 
instance of a Kafka Streams application.
+        It's important to understand that the value of this configuration 
property varies across the instances of your application.
+        When this property is set, then, for every instance of an application, 
Kafka Streams will keep track of the instance's RPC endpoint information, its 
state stores, and assigned stream partitions through instances of 
<code>StreamsMetadata</code>
+    </p>
+    <p>
+        Below is an example of configuring and running a Kafka Streams 
application that supports the discovery of its state stores.
+    </p>
+
+    <pre class="brush: java;">
+
+          Properties props = new Properties();
+          // Set the unique RPC endpoint of this application instance through 
which it
+          // can be interactively queried.  In a real application, the value 
would most
+          // probably not be hardcoded but derived dynamically.
+          String rpcEndpoint = "host1:4460";
+          props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
+          // ... further settings may follow here ...
+
+          StreamsConfig config = new StreamsConfig(props);
+          KStreamBuilder builder = new KStreamBuilder();
+
+          KStream&lt;String, String&gt; textLines = 
builder.stream(stringSerde, stringSerde, "word-count-input");
+
+          KGroupedStream&lt;String, String&gt; groupedByWord = textLines
+              .flatMapValues(value -> 
Arrays.asList(value.toLowerCase().split("\\W+")))
+              .groupBy((key, word) -> word, stringSerde, stringSerde);
+
+          // This call to `count()` creates a state store named "word-count".
+          // The state store is discoverable and can be queried interactively.
+          groupedByWord.count("word-count");
+
+          // Start an instance of the topology
+          KafkaStreams streams = new KafkaStreams(builder, 
streamsConfiguration);
+          streams.start();
+
+          // Then, create and start the actual RPC service for remote access 
to this
+          // application instance's local state stores.
+          //
+          // This service should be started on the same host and port as 
defined above by
+          // the property `StreamsConfig.APPLICATION_SERVER_CONFIG`.  The 
example below is
+          // fictitious, but we provide end-to-end demo applications (such as 
KafkaMusicExample)
+          // that showcase how to implement such a service to get you started.
+          MyRPCService rpcService = ...;
+          rpcService.listenAt(rpcEndpoint);
+        </pre>
+
+    <h4><a 
id="streams_developer-guide_interactive-queries_discover-app-instances-and-stores"
 
href="#streams_developer-guide_interactive-queries_discover-app-instances-and-stores">Discovering
 and accessing application instances and their respective local state 
stores</a></h4>
+    <p>
+        With the <code>application.server</code> property set, we can now find 
the locations of remote app instances and their state stores.
+        The following methods return <code>StreamsMetadata</code> objects, 
which provide meta-information about application instances such as their RPC 
endpoint and locally available state stores.
+    </p>
+    <ul>
+        <li><code>KafkaStreams#allMetadata()</code>: find all instances of 
this application</li>
+        <li><code>KafkaStreams#allMetadataForStore(String storeName)</code>: 
find those applications instances that manage local instances of the state 
store "storeName"</li>
+        <li><code>KafkaStreams#metadataForKey(String storeName, K key, 
Serializer&lt;K&gt; keySerializer)</code>: using the default stream 
partitioning strategy, find the one application instance that holds the data 
for the given key in the given state store</li>
+        <li><code>KafkaStreams#metadataForKey(String storeName, K key, 
StreamPartitioner&lt;K, ?&gt; partitioner)</code>: using 
<code>>partitioner</code>, find the one application instance that holds the 
data for the given key in the given state store</li>
+    </ul>
+
+    <p>
+        If <code>application.server</code> is not configured for an 
application instance, then the above methods will not find any 
<code>StreamsMetadata</code> for it.
+    </p>
+
+    <p>
+        For example, we can now find the <code>StreamsMetadata</code> for the 
state store named "word-count" that we defined in the code example shown in the 
previous section:
+    </p>
+
+    <pre class="brush: java;">
+
+          KafkaStreams streams = ...;
+          // Find all the locations of local instances of the state store 
named "word-count"
+          Collection&lt;StreamsMetadata&gt; wordCountHosts = 
streams.allMetadataForStore("word-count");
+
+          // For illustrative purposes, we assume using an HTTP client to talk 
to remote app instances.
+          HttpClient http = ...;
+
+          // Get the word count for word (aka key) 'alice': Approach 1
+          //
+          // We first find the one app instance that manages the count for 
'alice' in its local state stores.
+          StreamsMetadata metadata = streams.metadataForKey("word-count", 
"alice", Serdes.String().serializer());
+          // Then, we query only that single app instance for the latest count 
of 'alice'.
+          // Note: The RPC URL shown below is fictitious and only serves to 
illustrate the idea.  Ultimately,
+          // the URL (or, in general, the method of communication) will depend 
on the RPC layer you opted to
+          // implement.  Again, we provide end-to-end demo applications (such 
as KafkaMusicExample) that showcase
+          // how to implement such an RPC layer.
+          Long result = http.getLong("http://"; + metadata.host() + ":" + 
metadata.port() + "/word-count/alice");
+
+          // Get the word count for word (aka key) 'alice': Approach 2
+          //
+          // Alternatively, we could also choose (say) a brute-force approach 
where we query every app instance
+          // until we find the one that happens to know about 'alice'.
+          Optional&lt;Long&gt; result = 
streams.allMetadataForStore("word-count")
+              .stream()
+              .map(streamsMetadata -> {
+                  // Construct the (fictituous) full endpoint URL to query the 
current remote application instance
+                  String url = "http://"; + streamsMetadata.host() + ":" + 
streamsMetadata.port() + "/word-count/alice";
+                  // Read and return the count for 'alice', if any.
+                  return http.getLong(url);
+              })
+              .filter(s -> s != null)
+              .findFirst();
+        </pre>
+
+    <p>
+        At this point the full state of the application is interactively 
queryable:
+    </p>
+    <ul>
+        <li>We can discover the running instances of the application as well 
as the state stores they manage locally.</li>
+        <li>Through the RPC layer that was added to the application, we can 
communicate with these application instances over the network and query them 
for locally available state</li>
+        <li>The application instances are able to serve such queries because 
they can directly query their own local state stores and respond via the RPC 
layer</li>
+        <li>Collectively, this allows us to query the full state of the entire 
application</li>
+    </ul>
+
     <h3><a id="streams_configure_execute" 
href="#streams_configure_execute">Application Configuration and 
Execution</a></h3>
 
     <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f5525c/docs/streams/index.html
----------------------------------------------------------------------
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 2418497..013494b 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -32,6 +32,7 @@
             <ul>
                 <li><a 
href="/{{version}}/documentation/streams/developer-guide#streams_processor">Low-level
 Processor API</a></li>
                 <li><a 
href="/{{version}}/documentation/streams/developer-guide#streams_dsl">High-level
 Streams DSL</a></li>
+                <li><a 
href="/{{version}}/documentation/streams/developer-guide#streams_interactive_querie">Interactive
 Queries</a></li>
                 <li><a 
href="/{{version}}/documentation/streams/developer-guide#streams_execute">Application
 Configuration and Execution</a></li>
             </ul>
         </li>

Reply via email to