Repository: kafka
Updated Branches:
  refs/heads/trunk f8d6dba34 -> 57b0d0fe5


http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/streams.html
----------------------------------------------------------------------
diff --git a/docs/streams.html b/docs/streams.html
index bff9bf0..8192bf7 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -306,7 +306,7 @@
         The following example <code>Processor</code> implementation defines a 
simple word-count algorithm:
         </p>
 
-<pre>
+<pre class="brush: java;">
 public class MyProcessor implements Processor&lt;String, String&gt; {
     private ProcessorContext context;
     private KeyValueStore&lt;String, Long&gt; kvStore;
@@ -380,7 +380,7 @@ public class MyProcessor implements Processor&lt;String, 
String&gt; {
         by connecting these processors together:
         </p>
 
-<pre>
+<pre class="brush: java;">
 TopologyBuilder builder = new TopologyBuilder();
 
 builder.addSource("SOURCE", "src-topic")
@@ -424,7 +424,7 @@ builder.addSource("SOURCE", "src-topic")
         In the following example, a persistent key-value store named 
“Counts” with key type <code>String</code> and value type <code>Long</code> 
is created.
         </p>
 
-<pre>
+<pre class="brush: java;">
 StateStoreSupplier countStore = Stores.create("Counts")
     .withKeys(Serdes.String())
     .withValues(Serdes.Long())
@@ -438,7 +438,7 @@ StateStoreSupplier countStore = Stores.create("Counts")
         state store with the existing processor nodes through 
<code>TopologyBuilder.connectProcessorAndStateStores</code>.
         </p>
 
-<pre>
+<pre class="brush: java;">
 TopologyBuilder builder = new TopologyBuilder();
 
 builder.addSource("SOURCE", "src-topic")
@@ -526,7 +526,7 @@ builder.addSource("SOURCE", "src-topic")
         from a single topic).
         </p>
 
-<pre>
+<pre class="brush: java;">
 KStreamBuilder builder = new KStreamBuilder();
 
 KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", 
"topic2");
@@ -606,7 +606,7 @@ GlobalKTable&lt;String, GenericRecord&gt; source2 = 
builder.globalTable("topic4"
 
         </p>
 
-<pre>
+<pre class="brush: java;">
 // written in Java 8+, using lambda expressions
 KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record -> 
record.get("category"));
 </pre>
@@ -620,7 +620,7 @@ KStream&lt;String, GenericRecord&gt; mapped = 
source1.mapValue(record -> record.
         based on them.
         </p>
 
-<pre>
+<pre class="brush: java;">
 // written in Java 8+, using lambda expressions
 KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = 
source1.groupByKey().aggregate(
     () -> 0L,  // initial value
@@ -641,7 +641,7 @@ KStream&lt;String, String&gt; joined = 
source1.leftJoin(source2,
         <code>KStream.to</code> and <code>KTable.to</code>.
         </p>
 
-<pre>
+<pre class="brush: java;">
 joined.to("topic4");
 </pre>
 
@@ -649,7 +649,7 @@ joined.to("topic4");
         to a topic via <code>to</code> above, one option is to construct a new 
stream that reads from the output topic;
         Kafka Streams provides a convenience method called 
<code>through</code>:
 
-<pre>
+<pre class="brush: java;">
 // equivalent to
 //
 // joined.to("topic4");
@@ -677,7 +677,7 @@ KStream&lt;String, String&gt; materialized = 
joined.through("topic4");
         set the necessary parameters, and construct a 
<code>StreamsConfig</code> instance from the <code>Properties</code> instance.
         </p>
 
-<pre>
+<pre class="brush: java;">
 import java.util.Properties;
 import org.apache.kafka.streams.StreamsConfig;
 
@@ -702,7 +702,7 @@ StreamsConfig config = new StreamsConfig(settings);
         If you want to set different values for consumer and producer for such 
a parameter, you can prefix the parameter name with <code>consumer.</code> or 
<code>producer.</code>:
         </p>
 
-<pre>
+<pre class="brush: java;">
 Properties settings = new Properties();
 // Example of a "normal" setting for Kafka Streams
 settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
@@ -750,7 +750,7 @@ 
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG),
         that is used to define a topology; The second argument is an instance 
of <code>StreamsConfig</code> mentioned above.
         </p>
 
-<pre>
+<pre class="brush: java;">
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -778,7 +778,7 @@ KafkaStreams streams = new KafkaStreams(builder, config);
         At this point, internal structures have been initialized, but the 
processing is not started yet. You have to explicitly start the Kafka Streams 
thread by calling the <code>start()</code> method:
         </p>
 
-<pre>
+<pre class="brush: java;">
 // Start the Kafka Streams instance
 streams.start();
 </pre>
@@ -787,7 +787,7 @@ streams.start();
         To catch any unexpected exceptions, you may set an 
<code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the 
application. This handler is called whenever a stream thread is terminated by 
an unexpected exception:
         </p>
 
-<pre>
+<pre class="brush: java;">
 streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
     public uncaughtException(Thread t, throwable e) {
         // here you should examine the exception and perform an appropriate 
action!
@@ -799,7 +799,7 @@ streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
         To stop the application instance call the <code>close()</code> method:
         </p>
 
-<pre>
+<pre class="brush: java;">
 // Stop the Kafka Streams instance
 streams.close();
 </pre>
@@ -807,7 +807,7 @@ streams.close();
         Now it's time to execute your application that uses the Kafka Streams 
library, which can be run just like any other Java application – there is no 
special magic or requirement on the side of Kafka Streams.
         For example, you can package your Java application as a fat jar file 
and then start the application via:
 
-<pre>
+<pre class="brush: bash;">
 # Start the application in class `com.example.MyStreamsApp`
 # from the fat jar named `path-to-app-fatjar.jar`.
 $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

Reply via email to