Repository: kafka
Updated Branches:
  refs/heads/trunk abe5e0e48 -> af182480d


MINOR: fix indention in <pre> tags

Author: Matthias J. Sax <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #2553 from mjsax/hotfixDocs2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af182480
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af182480
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af182480

Branch: refs/heads/trunk
Commit: af182480d2f31f8ef44764fc8cfa250ec76a878f
Parents: abe5e0e
Author: Matthias J. Sax <[email protected]>
Authored: Wed Feb 15 17:02:15 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Wed Feb 15 17:02:15 2017 -0800

----------------------------------------------------------------------
 docs/streams.html | 392 ++++++++++++++++++++++++-------------------------
 1 file changed, 196 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af182480/docs/streams.html
----------------------------------------------------------------------
diff --git a/docs/streams.html b/docs/streams.html
index d9afee8..6461f86 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -306,60 +306,60 @@
         The following example <code>Processor</code> implementation defines a 
simple word-count algorithm:
         </p>
 
-        <pre>
-            public class MyProcessor extends Processor&lt;String, String&gt; {
-                private ProcessorContext context;
-                private KeyValueStore&lt;String, Long&gt; kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    // keep the processor context locally because we need it 
in punctuate() and commit()
-                    this.context = context;
-
-                    // call this processor's punctuate() method every 1000 
milliseconds.
-                    this.context.schedule(1000);
-
-                    // retrieve the key-value store named "Counts"
-                    this.kvStore = (KeyValueStore&lt;String, Long&gt;) 
context.getStateStore("Counts");
-                }
-
-                @Override
-                public void process(String dummy, String line) {
-                    String[] words = line.toLowerCase().split(" ");
-
-                    for (String word : words) {
-                        Long oldValue = this.kvStore.get(word);
-
-                        if (oldValue == null) {
-                            this.kvStore.put(word, 1L);
-                        } else {
-                            this.kvStore.put(word, oldValue + 1L);
-                        }
-                    }
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator&lt;String, Long&gt; iter = 
this.kvStore.all();
-
-                    while (iter.hasNext()) {
-                        KeyValue&lt;String, Long&gt; entry = iter.next();
-                        context.forward(entry.key, entry.value.toString());
-                    }
-
-                    iter.close();
-                    // commit the current processing progress
-                    context.commit();
-                }
-
-                @Override
-                public void close() {
-                    // close the key-value store
-                    this.kvStore.close();
-                }
-            };
-        </pre>
+<pre>
+public class MyProcessor extends Processor&lt;String, String&gt; {
+    private ProcessorContext context;
+    private KeyValueStore&lt;String, Long&gt; kvStore;
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context) {
+        // keep the processor context locally because we need it in 
punctuate() and commit()
+        this.context = context;
+
+        // call this processor's punctuate() method every 1000 milliseconds.
+        this.context.schedule(1000);
+
+        // retrieve the key-value store named "Counts"
+        this.kvStore = (KeyValueStore&lt;String, Long&gt;) 
context.getStateStore("Counts");
+    }
+
+    @Override
+    public void process(String dummy, String line) {
+        String[] words = line.toLowerCase().split(" ");
+
+        for (String word : words) {
+            Long oldValue = this.kvStore.get(word);
+
+            if (oldValue == null) {
+                this.kvStore.put(word, 1L);
+            } else {
+                this.kvStore.put(word, oldValue + 1L);
+            }
+        }
+    }
+
+    @Override
+    public void punctuate(long timestamp) {
+        KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
+
+        while (iter.hasNext()) {
+            KeyValue&lt;String, Long&gt; entry = iter.next();
+            context.forward(entry.key, entry.value.toString());
+        }
+
+        iter.close();
+        // commit the current processing progress
+        context.commit();
+    }
+
+    @Override
+    public void close() {
+        // close the key-value store
+        this.kvStore.close();
+    }
+};
+</pre>
 
         <p>
         In the above implementation, the following actions are performed:
@@ -379,31 +379,31 @@
         by connecting these processors together:
         </p>
 
-        <pre>
-            TopologyBuilder builder = new TopologyBuilder();
+<pre>
+TopologyBuilder builder = new TopologyBuilder();
 
-            builder.addSource("SOURCE", "src-topic")
-                // add "PROCESS1" node which takes the source processor 
"SOURCE" as its upstream processor
-                .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
+builder.addSource("SOURCE", "src-topic")
+    // add "PROCESS1" node which takes the source processor "SOURCE" as its 
upstream processor
+    .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
-                // add "PROCESS2" node which takes "PROCESS1" as its upstream 
processor
-                .addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
+    // add "PROCESS2" node which takes "PROCESS1" as its upstream processor
+    .addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
 
-                // add "PROCESS3" node which takes "PROCESS1" as its upstream 
processor
-                .addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
+    // add "PROCESS3" node which takes "PROCESS1" as its upstream processor
+    .addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
 
-                // add the sink processor node "SINK1" that takes Kafka topic 
"sink-topic1"
-                // as output and the "PROCESS1" node as its upstream processor
-                .addSink("SINK1", "sink-topic1", "PROCESS1")
+    // add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
+    // as output and the "PROCESS1" node as its upstream processor
+    .addSink("SINK1", "sink-topic1", "PROCESS1")
 
-                // add the sink processor node "SINK2" that takes Kafka topic 
"sink-topic2"
-                // as output and the "PROCESS2" node as its upstream processor
-                .addSink("SINK2", "sink-topic2", "PROCESS2")
+    // add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
+    // as output and the "PROCESS2" node as its upstream processor
+    .addSink("SINK2", "sink-topic2", "PROCESS2")
 
-                // add the sink processor node "SINK3" that takes Kafka topic 
"sink-topic3"
-                // as output and the "PROCESS3" node as its upstream processor
-                .addSink("SINK3", "sink-topic3", "PROCESS3");
-        </pre>
+    // add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
+    // as output and the "PROCESS3" node as its upstream processor
+    .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
 
         There are several steps in the above code to build the topology, and 
here is a quick walk through:
 
@@ -423,13 +423,13 @@
         In the following example, a persistent key-value store named 
“Counts” with key type <code>String</code> and value type <code>Long</code> 
is created.
         </p>
 
-        <pre>
-            StateStoreSupplier countStore = Stores.create("Counts")
-              .withKeys(Serdes.String())
-              .withValues(Serdes.Long())
-              .persistent()
-              .build();
-        </pre>
+<pre>
+StateStoreSupplier countStore = Stores.create("Counts")
+    .withKeys(Serdes.String())
+    .withValues(Serdes.Long())
+    .persistent()
+    .build();
+</pre>
 
         <p>
         To take advantage of these state stores, developers can use the 
<code>TopologyBuilder.addStateStore</code> method when building the
@@ -437,24 +437,24 @@
         state store with the existing processor nodes through 
<code>TopologyBuilder.connectProcessorAndStateStores</code>.
         </p>
 
-        <pre>
-            TopologyBuilder builder = new TopologyBuilder();
+<pre>
+TopologyBuilder builder = new TopologyBuilder();
 
-            builder.addSource("SOURCE", "src-topic")
+builder.addSource("SOURCE", "src-topic")
 
-                .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
-                // add the created state store "COUNTS" associated with 
processor "PROCESS1"
-                .addStateStore(countStore, "PROCESS1")
-                .addProcessor("PROCESS2", MyProcessor3::new /* the 
ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
-                .addProcessor("PROCESS3", MyProcessor3::new /* the 
ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
+    // add the created state store "COUNTS" associated with processor 
"PROCESS1"
+    .addStateStore(countStore, "PROCESS1")
+    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that 
can generate MyProcessor3 */, "PROCESS1")
+    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that 
can generate MyProcessor3 */, "PROCESS1")
 
-                // connect the state store "COUNTS" with processor "PROCESS2"
-                .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+    // connect the state store "COUNTS" with processor "PROCESS2"
+    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
 
-                .addSink("SINK1", "sink-topic1", "PROCESS1")
-                .addSink("SINK2", "sink-topic2", "PROCESS2")
-                .addSink("SINK3", "sink-topic3", "PROCESS3");
-        </pre>
+    .addSink("SINK1", "sink-topic1", "PROCESS1")
+    .addSink("SINK2", "sink-topic2", "PROCESS2")
+    .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
 
         In the next section we present another way to build the processor 
topology: the Kafka Streams DSL.
         <br>
@@ -511,9 +511,9 @@
 
         To illustrate the difference between KStreams and 
KTables/GlobalKTables, let’s imagine the following two data records are being 
sent to the stream:
 
-        <pre>
-            ("alice", 1) --> ("alice", 3)
-        </pre>
+<pre>
+("alice", 1) --> ("alice", 3)
+</pre>
 
         If these records a KStream and the stream processing application were 
to sum the values it would return <code>4</code>. If these records were a 
KTable or GlobalKTable, the return would be <code>3</code>, since the last 
record would be considered as an update.
 
@@ -525,13 +525,13 @@
         from a single topic).
         </p>
 
-        <pre>
-            KStreamBuilder builder = new KStreamBuilder();
+<pre>
+KStreamBuilder builder = new KStreamBuilder();
 
-            KStream&lt;String, GenericRecord&gt; source1 = 
builder.stream("topic1", "topic2");
-            KTable&lt;String, GenericRecord&gt; source2 = 
builder.table("topic3", "stateStoreName");
-            GlobalKTable&lt;String, GenericRecord&gt; source2 = 
builder.globalTable("topic4", "globalStoreName");
-        </pre>
+KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", 
"topic2");
+KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", 
"stateStoreName");
+GlobalKTable&lt;String, GenericRecord&gt; source2 = 
builder.globalTable("topic4", "globalStoreName");
+</pre>
 
         <h4><a id="streams_dsl_windowing" 
href="#streams_dsl_windowing">Windowing a stream</a></h4>
         A stream processor may need to divide data records into time buckets, 
i.e. to <b>window</b> the stream by time. This is usually needed for join and 
aggregation operations, etc. Kafka Streams currently defines the following 
types of windows:
@@ -605,10 +605,10 @@
 
         </p>
 
-        <pre>
-            // written in Java 8+, using lambda expressions
-            KStream&lt;String, GenericRecord&gt; mapped = 
source1.mapValue(record -> record.get("category"));
-        </pre>
+<pre>
+// written in Java 8+, using lambda expressions
+KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record -> 
record.get("category"));
+</pre>
 
         <p>
         Stateless transformations, by definition, do not depend on any state 
for processing, and hence implementation-wise
@@ -619,19 +619,19 @@
         based on them.
         </p>
 
-        <pre>
-            // written in Java 8+, using lambda expressions
-            KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = 
source1.groupByKey().aggregate(
-                () -> 0L,  // initial value
-                (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating 
value
-                TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals 
in milliseconds
-                Serdes.Long() // serde for aggregated value
-            );
+<pre>
+// written in Java 8+, using lambda expressions
+KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = 
source1.groupByKey().aggregate(
+    () -> 0L,  // initial value
+    (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
+    TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in 
milliseconds
+    Serdes.Long() // serde for aggregated value
+);
 
-            KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
-                (record1, record2) -> record1.get("user") + "-" + 
record2.get("region");
-            );
-        </pre>
+KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
+    (record1, record2) -> record1.get("user") + "-" + record2.get("region");
+);
+</pre>
 
         <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams 
back to Kafka</a></h4>
 
@@ -640,21 +640,21 @@
         <code>KStream.to</code> and <code>KTable.to</code>.
         </p>
 
-        <pre>
-            joined.to("topic4");
-        </pre>
+<pre>
+joined.to("topic4");
+</pre>
 
         If your application needs to continue reading and processing the 
records after they have been materialized
         to a topic via <code>to</code> above, one option is to construct a new 
stream that reads from the output topic;
         Kafka Streams provides a convenience method called 
<code>through</code>:
 
-        <pre>
-            // equivalent to
-            //
-            // joined.to("topic4");
-            // materialized = builder.stream("topic4");
-            KStream&lt;String, String&gt; materialized = 
joined.through("topic4");
-        </pre>
+<pre>
+// equivalent to
+//
+// joined.to("topic4");
+// materialized = builder.stream("topic4");
+KStream&lt;String, String&gt; materialized = joined.through("topic4");
+</pre>
         <br>
 
         <h3><a id="streams_execute" href="#streams_execute">Application 
Configuration and Execution</a></h3>
@@ -670,21 +670,21 @@
         set the necessary parameters, and construct a 
<code>StreamsConfig</code> instance from the <code>Properties</code> instance.
         </p>
 
-        <pre>
-            import java.util.Properties;
-            import org.apache.kafka.streams.StreamsConfig;
+<pre>
+import java.util.Properties;
+import org.apache.kafka.streams.StreamsConfig;
 
-            Properties settings = new Properties();
-            // Set a few key parameters
-            settings.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-first-streams-application");
-            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-broker1:9092");
-            settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
"zookeeper1:2181");
-            // Any further settings
-            settings.put(... , ...);
+Properties settings = new Properties();
+// Set a few key parameters
+settings.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-first-streams-application");
+settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
+// Any further settings
+settings.put(... , ...);
 
-            // Create an instance of StreamsConfig from the Properties instance
-            StreamsConfig config = new StreamsConfig(settings);
-        </pre>
+// Create an instance of StreamsConfig from the Properties instance
+StreamsConfig config = new StreamsConfig(settings);
+</pre>
 
         <p>
         Apart from Kafka Streams' own configuration parameters you can also 
specify parameters for the Kafka consumers and producers that are used 
internally,
@@ -694,24 +694,24 @@
         If you want to set different values for consumer and producer for such 
a parameter, you can prefix the parameter name with <code>consumer.</code> or 
<code>producer.</code>:
         </p>
 
-        <pre>
-            Properties settings = new Properties();
-            // Example of a "normal" setting for Kafka Streams
-            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-broker-01:9092");
+<pre>
+Properties settings = new Properties();
+// Example of a "normal" setting for Kafka Streams
+settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
 
-            // Customize the Kafka consumer settings
-            streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
60000);
+// Customize the Kafka consumer settings
+streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
 
-            // Customize a common client setting for both consumer and producer
-            settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
+// Customize a common client setting for both consumer and producer
+settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
 
-            // Customize different values for consumer and producer
-            settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
1024 * 1024);
-            settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 
64 * 1024);
-            // Alternatively, you can use
-            
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
 1024 * 1024);
-            
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG),
 64 * 1024);
-        </pre>
+// Customize different values for consumer and producer
+settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
+settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
+// Alternatively, you can use
+settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
 1024 * 1024);
+settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG),
 64 * 1024);
+</pre>
 
         <p>
         You can call Kafka Streams from anywhere in your application code.
@@ -724,68 +724,68 @@
         that is used to define a topology; The second argument is an instance 
of <code>StreamsConfig</code> mentioned above.
         </p>
 
-        <pre>
-            import org.apache.kafka.streams.KafkaStreams;
-            import org.apache.kafka.streams.StreamsConfig;
-            import org.apache.kafka.streams.kstream.KStreamBuilder;
-            import org.apache.kafka.streams.processor.TopologyBuilder;
+<pre>
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.TopologyBuilder;
 
-            // Use the builders to define the actual processing topology, e.g. 
to specify
-            // from which input topics to read, which stream operations 
(filter, map, etc.)
-            // should be called, and so on.
+// Use the builders to define the actual processing topology, e.g. to specify
+// from which input topics to read, which stream operations (filter, map, etc.)
+// should be called, and so on.
 
-            KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
-            //
-            // OR
-            //
-            TopologyBuilder builder = ...; // when using the Processor API
+KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
+//
+// OR
+//
+TopologyBuilder builder = ...; // when using the Processor API
 
-            // Use the configuration to tell your application where the Kafka 
cluster is,
-            // which serializers/deserializers to use by default, to specify 
security settings,
-            // and so on.
-            StreamsConfig config = ...;
+// Use the configuration to tell your application where the Kafka cluster is,
+// which serializers/deserializers to use by default, to specify security 
settings,
+// and so on.
+StreamsConfig config = ...;
 
-            KafkaStreams streams = new KafkaStreams(builder, config);
-        </pre>
+KafkaStreams streams = new KafkaStreams(builder, config);
+</pre>
 
         <p>
         At this point, internal structures have been initialized, but the 
processing is not started yet. You have to explicitly start the Kafka Streams 
thread by calling the <code>start()</code> method:
         </p>
 
-        <pre>
-            // Start the Kafka Streams instance
-            streams.start();
-        </pre>
+<pre>
+// Start the Kafka Streams instance
+streams.start();
+</pre>
 
         <p>
         To catch any unexpected exceptions, you may set an 
<code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the 
application. This handler is called whenever a stream thread is terminated by 
an unexpected exception:
         </p>
 
-        <pre>
-            streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
-                public uncaughtException(Thread t, throwable e) {
-                    // here you should examine the exception and perform an 
appropriate action!
-                }
-            );
-        </pre>
+<pre>
+streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+    public uncaughtException(Thread t, throwable e) {
+        // here you should examine the exception and perform an appropriate 
action!
+    }
+);
+</pre>
 
         <p>
         To stop the application instance call the <code>close()</code> method:
         </p>
 
-        <pre>
-            // Stop the Kafka Streams instance
-            streams.close();
-        </pre>
+<pre>
+// Stop the Kafka Streams instance
+streams.close();
+</pre>
 
         Now it's time to execute your application that uses the Kafka Streams 
library, which can be run just like any other Java application – there is no 
special magic or requirement on the side of Kafka Streams.
         For example, you can package your Java application as a fat jar file 
and then start the application via:
 
-        <pre>
-            # Start the application in class `com.example.MyStreamsApp`
-            # from the fat jar named `path-to-app-fatjar.jar`.
-            $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
-        </pre>
+<pre>
+# Start the application in class `com.example.MyStreamsApp`
+# from the fat jar named `path-to-app-fatjar.jar`.
+$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
+</pre>
 
         <p>
         When the application instance starts running, the defined processor 
topology will be initialized as one or more stream tasks that can be executed 
in parallel by the stream threads within the instance.

Reply via email to