http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-programming-guide.html
----------------------------------------------------------------------
diff --git a/site/docs/2.1.0/streaming-programming-guide.html 
b/site/docs/2.1.0/streaming-programming-guide.html
index 9a87d23..b1ce1e1 100644
--- a/site/docs/2.1.0/streaming-programming-guide.html
+++ b/site/docs/2.1.0/streaming-programming-guide.html
@@ -129,32 +129,32 @@
                     
 
                     <ul id="markdown-toc">
-  <li><a href="#overview" id="markdown-toc-overview">Overview</a></li>
-  <li><a href="#a-quick-example" id="markdown-toc-a-quick-example">A Quick 
Example</a></li>
-  <li><a href="#basic-concepts" id="markdown-toc-basic-concepts">Basic 
Concepts</a>    <ul>
-      <li><a href="#linking" id="markdown-toc-linking">Linking</a></li>
-      <li><a href="#initializing-streamingcontext" 
id="markdown-toc-initializing-streamingcontext">Initializing 
StreamingContext</a></li>
-      <li><a href="#discretized-streams-dstreams" 
id="markdown-toc-discretized-streams-dstreams">Discretized Streams 
(DStreams)</a></li>
-      <li><a href="#input-dstreams-and-receivers" 
id="markdown-toc-input-dstreams-and-receivers">Input DStreams and 
Receivers</a></li>
-      <li><a href="#transformations-on-dstreams" 
id="markdown-toc-transformations-on-dstreams">Transformations on 
DStreams</a></li>
-      <li><a href="#output-operations-on-dstreams" 
id="markdown-toc-output-operations-on-dstreams">Output Operations on 
DStreams</a></li>
-      <li><a href="#dataframe-and-sql-operations" 
id="markdown-toc-dataframe-and-sql-operations">DataFrame and SQL 
Operations</a></li>
-      <li><a href="#mllib-operations" id="markdown-toc-mllib-operations">MLlib 
Operations</a></li>
-      <li><a href="#caching--persistence" 
id="markdown-toc-caching--persistence">Caching / Persistence</a></li>
-      <li><a href="#checkpointing" 
id="markdown-toc-checkpointing">Checkpointing</a></li>
-      <li><a href="#accumulators-broadcast-variables-and-checkpoints" 
id="markdown-toc-accumulators-broadcast-variables-and-checkpoints">Accumulators,
 Broadcast Variables, and Checkpoints</a></li>
-      <li><a href="#deploying-applications" 
id="markdown-toc-deploying-applications">Deploying Applications</a></li>
-      <li><a href="#monitoring-applications" 
id="markdown-toc-monitoring-applications">Monitoring Applications</a></li>
+  <li><a href="#overview">Overview</a></li>
+  <li><a href="#a-quick-example">A Quick Example</a></li>
+  <li><a href="#basic-concepts">Basic Concepts</a>    <ul>
+      <li><a href="#linking">Linking</a></li>
+      <li><a href="#initializing-streamingcontext">Initializing 
StreamingContext</a></li>
+      <li><a href="#discretized-streams-dstreams">Discretized Streams 
(DStreams)</a></li>
+      <li><a href="#input-dstreams-and-receivers">Input DStreams and 
Receivers</a></li>
+      <li><a href="#transformations-on-dstreams">Transformations on 
DStreams</a></li>
+      <li><a href="#output-operations-on-dstreams">Output Operations on 
DStreams</a></li>
+      <li><a href="#dataframe-and-sql-operations">DataFrame and SQL 
Operations</a></li>
+      <li><a href="#mllib-operations">MLlib Operations</a></li>
+      <li><a href="#caching--persistence">Caching / Persistence</a></li>
+      <li><a href="#checkpointing">Checkpointing</a></li>
+      <li><a 
href="#accumulators-broadcast-variables-and-checkpoints">Accumulators, 
Broadcast Variables, and Checkpoints</a></li>
+      <li><a href="#deploying-applications">Deploying Applications</a></li>
+      <li><a href="#monitoring-applications">Monitoring Applications</a></li>
     </ul>
   </li>
-  <li><a href="#performance-tuning" 
id="markdown-toc-performance-tuning">Performance Tuning</a>    <ul>
-      <li><a href="#reducing-the-batch-processing-times" 
id="markdown-toc-reducing-the-batch-processing-times">Reducing the Batch 
Processing Times</a></li>
-      <li><a href="#setting-the-right-batch-interval" 
id="markdown-toc-setting-the-right-batch-interval">Setting the Right Batch 
Interval</a></li>
-      <li><a href="#memory-tuning" id="markdown-toc-memory-tuning">Memory 
Tuning</a></li>
+  <li><a href="#performance-tuning">Performance Tuning</a>    <ul>
+      <li><a href="#reducing-the-batch-processing-times">Reducing the Batch 
Processing Times</a></li>
+      <li><a href="#setting-the-right-batch-interval">Setting the Right Batch 
Interval</a></li>
+      <li><a href="#memory-tuning">Memory Tuning</a></li>
     </ul>
   </li>
-  <li><a href="#fault-tolerance-semantics" 
id="markdown-toc-fault-tolerance-semantics">Fault-tolerance Semantics</a></li>
-  <li><a href="#where-to-go-from-here" 
id="markdown-toc-where-to-go-from-here">Where to Go from Here</a></li>
+  <li><a href="#fault-tolerance-semantics">Fault-tolerance Semantics</a></li>
+  <li><a href="#where-to-go-from-here">Where to Go from Here</a></li>
 </ul>
 
 <h1 id="overview">Overview</h1>
@@ -209,7 +209,7 @@ conversions from StreamingContext into our environment in 
order to add useful me
 other classes we need (like DStream). <a 
href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a>
 is the
 main entry point for all streaming functionality. We create a local 
StreamingContext with two execution threads,  and a batch interval of 1 
second.</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">import</span> <span 
class="nn">org.apache.spark._</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">import</span> <span 
class="nn">org.apache.spark._</span>
 <span class="k">import</span> <span 
class="nn">org.apache.spark.streaming._</span>
 <span class="k">import</span> <span 
class="nn">org.apache.spark.streaming.StreamingContext._</span> <span 
class="c1">// not necessary since Spark 1.3</span>
 
@@ -217,33 +217,33 @@ main entry point for all streaming functionality. We 
create a local StreamingCon
 <span class="c1">// The master requires 2 cores to prevent from a starvation 
scenario.</span>
 
 <span class="k">val</span> <span class="n">conf</span> <span 
class="k">=</span> <span class="k">new</span> <span 
class="nc">SparkConf</span><span class="o">().</span><span 
class="n">setMaster</span><span class="o">(</span><span 
class="s">&quot;local[2]&quot;</span><span class="o">).</span><span 
class="n">setAppName</span><span class="o">(</span><span 
class="s">&quot;NetworkWordCount&quot;</span><span class="o">)</span>
-<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> 
<span class="k">new</span> <span class="nc">StreamingContext</span><span 
class="o">(</span><span class="n">conf</span><span class="o">,</span> <span 
class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span></code></pre></div>
+<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> 
<span class="k">new</span> <span class="nc">StreamingContext</span><span 
class="o">(</span><span class="n">conf</span><span class="o">,</span> <span 
class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span></code></pre></figure>
 
     <p>Using this context, we can create a DStream that represents streaming 
data from a TCP
 source, specified as hostname (e.g. <code>localhost</code>) and port (e.g. 
<code>9999</code>).</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">// Create a DStream that will connect to 
hostname:port, like localhost:9999</span>
-<span class="k">val</span> <span class="n">lines</span> <span 
class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">)</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="c1">// Create a DStream that will 
connect to hostname:port, like localhost:9999</span>
+<span class="k">val</span> <span class="n">lines</span> <span 
class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">)</span></code></pre></figure>
 
     <p>This <code>lines</code> DStream represents the stream of data that will 
be received from the data
 server. Each record in this DStream is a line of text. Next, we want to split 
the lines by
 space characters into words.</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">// Split each line into words</span>
-<span class="k">val</span> <span class="n">words</span> <span 
class="k">=</span> <span class="n">lines</span><span class="o">.</span><span 
class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span 
class="o">.</span><span class="n">split</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">))</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="c1">// Split each line into 
words</span>
+<span class="k">val</span> <span class="n">words</span> <span 
class="k">=</span> <span class="n">lines</span><span class="o">.</span><span 
class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span 
class="o">.</span><span class="n">split</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">))</span></code></pre></figure>
 
     <p><code>flatMap</code> is a one-to-many DStream operation that creates a 
new DStream by
 generating multiple new records from each record in the source DStream. In 
this case,
 each line will be split into multiple words and the stream of words is 
represented as the
 <code>words</code> DStream.  Next, we want to count these words.</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">import</span> <span 
class="nn">org.apache.spark.streaming.StreamingContext._</span> <span 
class="c1">// not necessary since Spark 1.3</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">import</span> <span 
class="nn">org.apache.spark.streaming.StreamingContext._</span> <span 
class="c1">// not necessary since Spark 1.3</span>
 <span class="c1">// Count each word in each batch</span>
 <span class="k">val</span> <span class="n">pairs</span> <span 
class="k">=</span> <span class="n">words</span><span class="o">.</span><span 
class="n">map</span><span class="o">(</span><span class="n">word</span> <span 
class="k">=&gt;</span> <span class="o">(</span><span class="n">word</span><span 
class="o">,</span> <span class="mi">1</span><span class="o">))</span>
 <span class="k">val</span> <span class="n">wordCounts</span> <span 
class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span 
class="n">reduceByKey</span><span class="o">(</span><span class="k">_</span> 
<span class="o">+</span> <span class="k">_</span><span class="o">)</span>
 
 <span class="c1">// Print the first ten elements of each RDD generated in this 
DStream to the console</span>
-<span class="n">wordCounts</span><span class="o">.</span><span 
class="n">print</span><span class="o">()</span></code></pre></div>
+<span class="n">wordCounts</span><span class="o">.</span><span 
class="n">print</span><span class="o">()</span></code></pre></figure>
 
     <p>The <code>words</code> DStream is further mapped (one-to-one 
transformation) to a DStream of <code>(word,
 1)</code> pairs, which is then reduced to get the frequency of words in each 
batch of data.
@@ -253,8 +253,8 @@ Finally, <code>wordCounts.print()</code> will print a few 
of the counts generate
 will perform when it is started, and no real processing has started yet. To 
start the processing
 after all the transformations have been setup, we finally call</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">ssc</span><span class="o">.</span><span 
class="n">start</span><span class="o">()</span>             <span class="c1">// 
Start the computation</span>
-<span class="n">ssc</span><span class="o">.</span><span 
class="n">awaitTermination</span><span class="o">()</span>  <span class="c1">// 
Wait for the computation to terminate</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="n">ssc</span><span 
class="o">.</span><span class="n">start</span><span class="o">()</span>         
    <span class="c1">// Start the computation</span>
+<span class="n">ssc</span><span class="o">.</span><span 
class="n">awaitTermination</span><span class="o">()</span>  <span class="c1">// 
Wait for the computation to terminate</span></code></pre></figure>
 
     <p>The complete code can be found in the Spark Streaming example
 <a 
href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala";>NetworkWordCount</a>.
@@ -268,33 +268,33 @@ after all the transformations have been setup, we finally 
call</p>
 which is the main entry point for all streaming
 functionality. We create a local StreamingContext with two execution threads, 
and a batch interval of 1 second.</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kn">import</span> <span 
class="nn">org.apache.spark.*</span><span class="o">;</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kn">import</span> <span 
class="nn">org.apache.spark.*</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.api.java.function.*</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.*</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">scala.Tuple2</span><span 
class="o">;</span>
 
 <span class="c1">// Create a local StreamingContext with two working thread 
and batch interval of 1 second</span>
-<span class="n">SparkConf</span> <span class="n">conf</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="nf">SparkConf</span><span class="o">().</span><span 
class="na">setMaster</span><span class="o">(</span><span 
class="s">&quot;local[2]&quot;</span><span class="o">).</span><span 
class="na">setAppName</span><span class="o">(</span><span 
class="s">&quot;NetworkWordCount&quot;</span><span class="o">);</span>
-<span class="n">JavaStreamingContext</span> <span class="n">jssc</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="nf">JavaStreamingContext</span><span class="o">(</span><span 
class="n">conf</span><span class="o">,</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">));</span></code></pre></div>
+<span class="n">SparkConf</span> <span class="n">conf</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">SparkConf</span><span class="o">().</span><span 
class="na">setMaster</span><span class="o">(</span><span 
class="s">&quot;local[2]&quot;</span><span class="o">).</span><span 
class="na">setAppName</span><span class="o">(</span><span 
class="s">&quot;NetworkWordCount&quot;</span><span class="o">);</span>
+<span class="n">JavaStreamingContext</span> <span class="n">jssc</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">JavaStreamingContext</span><span class="o">(</span><span 
class="n">conf</span><span class="o">,</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">));</span></code></pre></figure>
 
     <p>Using this context, we can create a DStream that represents streaming 
data from a TCP
 source, specified as hostname (e.g. <code>localhost</code>) and port (e.g. 
<code>9999</code>).</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Create a DStream that will connect to 
hostname:port, like localhost:9999</span>
-<span class="n">JavaReceiverInputDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">lines</span> <span class="o">=</span> <span 
class="n">jssc</span><span class="o">.</span><span 
class="na">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">);</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Create a DStream that will 
connect to hostname:port, like localhost:9999</span>
+<span class="n">JavaReceiverInputDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">lines</span> <span class="o">=</span> <span 
class="n">jssc</span><span class="o">.</span><span 
class="na">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">);</span></code></pre></figure>
 
     <p>This <code>lines</code> DStream represents the stream of data that will 
be received from the data
 server. Each record in this stream is a line of text. Then, we want to split 
the lines by
 space into words.</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Split each line into words</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Split each line into 
words</span>
 <span class="n">JavaDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="n">lines</span><span 
class="o">.</span><span class="na">flatMap</span><span class="o">(</span>
   <span class="k">new</span> <span class="n">FlatMapFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
     <span class="nd">@Override</span> <span class="kd">public</span> <span 
class="n">Iterator</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="nf">call</span><span class="o">(</span><span class="n">String</span> 
<span class="n">x</span><span class="o">)</span> <span class="o">{</span>
       <span class="k">return</span> <span class="n">Arrays</span><span 
class="o">.</span><span class="na">asList</span><span class="o">(</span><span 
class="n">x</span><span class="o">.</span><span class="na">split</span><span 
class="o">(</span><span class="s">&quot; &quot;</span><span 
class="o">)).</span><span class="na">iterator</span><span class="o">();</span>
     <span class="o">}</span>
-  <span class="o">});</span></code></pre></div>
+  <span class="o">});</span></code></pre></figure>
 
     <p><code>flatMap</code> is a DStream operation that creates a new DStream 
by
 generating multiple new records from each record in the source DStream. In 
this case,
@@ -306,7 +306,7 @@ that help define DStream transformations.</p>
 
     <p>Next, we want to count these words.</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Count each word in each batch</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Count each word in each 
batch</span>
 <span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">pairs</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span 
class="na">mapToPair</span><span class="o">(</span>
   <span class="k">new</span> <span class="n">PairFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
     <span class="nd">@Override</span> <span class="kd">public</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="nf">call</span><span class="o">(</span><span class="n">String</span> 
<span class="n">s</span><span class="o">)</span> <span class="o">{</span>
@@ -321,7 +321,7 @@ that help define DStream transformations.</p>
   <span class="o">});</span>
 
 <span class="c1">// Print the first ten elements of each RDD generated in this 
DStream to the console</span>
-<span class="n">wordCounts</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span></code></pre></div>
+<span class="n">wordCounts</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span></code></pre></figure>
 
     <p>The <code>words</code> DStream is further mapped (one-to-one 
transformation) to a DStream of <code>(word,
 1)</code> pairs, using a <a 
href="api/scala/index.html#org.apache.spark.api.java.function.PairFunction">PairFunction</a>
@@ -333,8 +333,8 @@ Finally, <code>wordCounts.print()</code> will print a few 
of the counts generate
 will perform after it is started, and no real processing has started yet. To 
start the processing
 after all the transformations have been setup, we finally call 
<code>start</code> method.</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">jssc</span><span class="o">.</span><span 
class="na">start</span><span class="o">();</span>              <span 
class="c1">// Start the computation</span>
-<span class="n">jssc</span><span class="o">.</span><span 
class="na">awaitTermination</span><span class="o">();</span>   <span 
class="c1">// Wait for the computation to terminate</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">jssc</span><span 
class="o">.</span><span class="na">start</span><span class="o">();</span>       
       <span class="c1">// Start the computation</span>
+<span class="n">jssc</span><span class="o">.</span><span 
class="na">awaitTermination</span><span class="o">();</span>   <span 
class="c1">// Wait for the computation to terminate</span></code></pre></figure>
 
     <p>The complete code can be found in the Spark Streaming example
 <a 
href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java";>JavaNetworkWordCount</a>.
@@ -344,37 +344,37 @@ after all the transformations have been setup, we finally 
call <code>start</code
 <div data-lang="python">
     <p>First, we import <a 
href="api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext">StreamingContext</a>,
 which is the main entry point for all streaming functionality. We create a 
local StreamingContext with two execution threads, and batch interval of 1 
second.</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="kn">from</span> <span class="nn">pyspark</span> 
<span class="kn">import</span> <span class="n">SparkContext</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="kn">from</span> <span 
class="nn">pyspark</span> <span class="kn">import</span> <span 
class="n">SparkContext</span>
 <span class="kn">from</span> <span class="nn">pyspark.streaming</span> <span 
class="kn">import</span> <span class="n">StreamingContext</span>
 
-<span class="c"># Create a local StreamingContext with two working thread and 
batch interval of 1 second</span>
-<span class="n">sc</span> <span class="o">=</span> <span 
class="n">SparkContext</span><span class="p">(</span><span 
class="s">&quot;local[2]&quot;</span><span class="p">,</span> <span 
class="s">&quot;NetworkWordCount&quot;</span><span class="p">)</span>
-<span class="n">ssc</span> <span class="o">=</span> <span 
class="n">StreamingContext</span><span class="p">(</span><span 
class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span 
class="p">)</span></code></pre></div>
+<span class="c1"># Create a local StreamingContext with two working thread and 
batch interval of 1 second</span>
+<span class="n">sc</span> <span class="o">=</span> <span 
class="n">SparkContext</span><span class="p">(</span><span 
class="s2">&quot;local[2]&quot;</span><span class="p">,</span> <span 
class="s2">&quot;NetworkWordCount&quot;</span><span class="p">)</span>
+<span class="n">ssc</span> <span class="o">=</span> <span 
class="n">StreamingContext</span><span class="p">(</span><span 
class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span 
class="p">)</span></code></pre></figure>
 
     <p>Using this context, we can create a DStream that represents streaming 
data from a TCP
 source, specified as hostname (e.g. <code>localhost</code>) and port (e.g. 
<code>9999</code>).</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="c"># Create a DStream that will connect to 
hostname:port, like localhost:9999</span>
-<span class="n">lines</span> <span class="o">=</span> <span 
class="n">ssc</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="p">(</span><span 
class="s">&quot;localhost&quot;</span><span class="p">,</span> <span 
class="mi">9999</span><span class="p">)</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="c1"># Create a DStream that will 
connect to hostname:port, like localhost:9999</span>
+<span class="n">lines</span> <span class="o">=</span> <span 
class="n">ssc</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="p">(</span><span 
class="s2">&quot;localhost&quot;</span><span class="p">,</span> <span 
class="mi">9999</span><span class="p">)</span></code></pre></figure>
 
     <p>This <code>lines</code> DStream represents the stream of data that will 
be received from the data
 server. Each record in this DStream is a line of text. Next, we want to split 
the lines by
 space into words.</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="c"># Split each line into words</span>
-<span class="n">words</span> <span class="o">=</span> <span 
class="n">lines</span><span class="o">.</span><span 
class="n">flatMap</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">line</span><span class="p">:</span> <span 
class="n">line</span><span class="o">.</span><span class="n">split</span><span 
class="p">(</span><span class="s">&quot; &quot;</span><span 
class="p">))</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="c1"># Split each line into 
words</span>
+<span class="n">words</span> <span class="o">=</span> <span 
class="n">lines</span><span class="o">.</span><span 
class="n">flatMap</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">line</span><span class="p">:</span> <span 
class="n">line</span><span class="o">.</span><span class="n">split</span><span 
class="p">(</span><span class="s2">&quot; &quot;</span><span 
class="p">))</span></code></pre></figure>
 
     <p><code>flatMap</code> is a one-to-many DStream operation that creates a 
new DStream by
 generating multiple new records from each record in the source DStream. In 
this case,
 each line will be split into multiple words and the stream of words is 
represented as the
 <code>words</code> DStream.  Next, we want to count these words.</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="c"># Count each word in each batch</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="c1"># Count each word in each 
batch</span>
 <span class="n">pairs</span> <span class="o">=</span> <span 
class="n">words</span><span class="o">.</span><span class="n">map</span><span 
class="p">(</span><span class="k">lambda</span> <span 
class="n">word</span><span class="p">:</span> <span class="p">(</span><span 
class="n">word</span><span class="p">,</span> <span class="mi">1</span><span 
class="p">))</span>
 <span class="n">wordCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="n">reduceByKey</span><span class="p">(</span><span 
class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span 
class="n">y</span><span class="p">:</span> <span class="n">x</span> <span 
class="o">+</span> <span class="n">y</span><span class="p">)</span>
 
-<span class="c"># Print the first ten elements of each RDD generated in this 
DStream to the console</span>
-<span class="n">wordCounts</span><span class="o">.</span><span 
class="n">pprint</span><span class="p">()</span></code></pre></div>
+<span class="c1"># Print the first ten elements of each RDD generated in this 
DStream to the console</span>
+<span class="n">wordCounts</span><span class="o">.</span><span 
class="n">pprint</span><span class="p">()</span></code></pre></figure>
 
     <p>The <code>words</code> DStream is further mapped (one-to-one 
transformation) to a DStream of <code>(word,
 1)</code> pairs, which is then reduced to get the frequency of words in each 
batch of data.
@@ -384,8 +384,8 @@ Finally, <code>wordCounts.pprint()</code> will print a few 
of the counts generat
 will perform when it is started, and no real processing has started yet. To 
start the processing
 after all the transformations have been setup, we finally call</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="n">ssc</span><span class="o">.</span><span 
class="n">start</span><span class="p">()</span>             <span class="c"># 
Start the computation</span>
-<span class="n">ssc</span><span class="o">.</span><span 
class="n">awaitTermination</span><span class="p">()</span>  <span class="c"># 
Wait for the computation to terminate</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="n">ssc</span><span 
class="o">.</span><span class="n">start</span><span class="p">()</span>         
    <span class="c1"># Start the computation</span>
+<span class="n">ssc</span><span class="o">.</span><span 
class="n">awaitTermination</span><span class="p">()</span>  <span class="c1"># 
Wait for the computation to terminate</span></code></pre></figure>
 
     <p>The complete code can be found in the Spark Streaming example
 <a 
href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/python/streaming/network_wordcount.py";>NetworkWordCount</a>.
@@ -398,24 +398,24 @@ after all the transformations have been setup, we finally 
call</p>
 you can run this example as follows. You will first need to run Netcat
 (a small utility found in most Unix-like systems) as a data server by using</p>
 
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span 
class="nv">$ </span>nc -lk 9999</code></pre></div>
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>$ nc -lk <span 
class="m">9999</span></code></pre></figure>
 
 <p>Then, in a different terminal, you can start the example by using</p>
 
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="nv">$ </span>./bin/run-example 
streaming.NetworkWordCount localhost 9999</code></pre></div>
+    <figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>$ ./bin/run-example streaming.NetworkWordCount 
localhost <span class="m">9999</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="nv">$ </span>./bin/run-example 
streaming.JavaNetworkWordCount localhost 9999</code></pre></div>
+    <figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>$ ./bin/run-example 
streaming.JavaNetworkWordCount localhost <span 
class="m">9999</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="nv">$ </span>./bin/spark-submit 
examples/src/main/python/streaming/network_wordcount.py localhost 
9999</code></pre></div>
+    <figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span>$ ./bin/spark-submit 
examples/src/main/python/streaming/network_wordcount.py localhost <span 
class="m">9999</span></code></pre></figure>
 
   </div>
 </div>
@@ -426,16 +426,16 @@ screen every second. It will look something like the 
following.</p>
 <table width="100%">
     <td>
 
-<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span 
class="c"># TERMINAL 1:</span>
-<span class="c"># Running Netcat</span>
+<figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span><span class="c1"># TERMINAL 1:</span>
+<span class="c1"># Running Netcat</span>
 
-<span class="nv">$ </span>nc -lk 9999
+$ nc -lk <span class="m">9999</span>
 
 hello world
 
 
 
-...</code></pre></div>
+...</code></pre></figure>
 
     </td>
     <td width="2%"></td>
@@ -444,45 +444,45 @@ hello world
 
 <div data-lang="scala">
 
-        <div class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="c"># TERMINAL 2: RUNNING NetworkWordCount</span>
+        <figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span><span class="c1"># TERMINAL 2: RUNNING 
NetworkWordCount</span>
 
-<span class="nv">$ </span>./bin/run-example streaming.NetworkWordCount 
localhost 9999
+$ ./bin/run-example streaming.NetworkWordCount localhost <span 
class="m">9999</span>
 ...
 -------------------------------------------
 Time: <span class="m">1357008430000</span> ms
 -------------------------------------------
 <span class="o">(</span>hello,1<span class="o">)</span>
 <span class="o">(</span>world,1<span class="o">)</span>
-...</code></pre></div>
+...</code></pre></figure>
 
       </div>
 
 <div data-lang="java">
 
-        <div class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="c"># TERMINAL 2: RUNNING 
JavaNetworkWordCount</span>
+        <figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span><span class="c1"># TERMINAL 2: RUNNING 
JavaNetworkWordCount</span>
 
-<span class="nv">$ </span>./bin/run-example streaming.JavaNetworkWordCount 
localhost 9999
+$ ./bin/run-example streaming.JavaNetworkWordCount localhost <span 
class="m">9999</span>
 ...
 -------------------------------------------
 Time: <span class="m">1357008430000</span> ms
 -------------------------------------------
 <span class="o">(</span>hello,1<span class="o">)</span>
 <span class="o">(</span>world,1<span class="o">)</span>
-...</code></pre></div>
+...</code></pre></figure>
 
       </div>
 <div data-lang="python">
 
-        <div class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span class="c"># TERMINAL 2: RUNNING 
network_wordcount.py</span>
+        <figure class="highlight"><pre><code class="language-bash" 
data-lang="bash"><span></span><span class="c1"># TERMINAL 2: RUNNING 
network_wordcount.py</span>
 
-<span class="nv">$ </span>./bin/spark-submit 
examples/src/main/python/streaming/network_wordcount.py localhost 9999
+$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py 
localhost <span class="m">9999</span>
 ...
 -------------------------------------------
-Time: 2014-10-14 15:25:21
+Time: <span class="m">2014</span>-10-14 <span class="m">15</span>:25:21
 -------------------------------------------
 <span class="o">(</span>hello,1<span class="o">)</span>
 <span class="o">(</span>world,1<span class="o">)</span>
-...</code></pre></div>
+...</code></pre></figure>
 
       </div>
 </div>
@@ -546,11 +546,11 @@ for the full list of supported sources and artifacts.</p>
 
     <p>A <a 
href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a>
 object can be created from a <a 
href="api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a> object.</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">import</span> <span 
class="nn">org.apache.spark._</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">import</span> <span 
class="nn">org.apache.spark._</span>
 <span class="k">import</span> <span 
class="nn">org.apache.spark.streaming._</span>
 
 <span class="k">val</span> <span class="n">conf</span> <span 
class="k">=</span> <span class="k">new</span> <span 
class="nc">SparkConf</span><span class="o">().</span><span 
class="n">setAppName</span><span class="o">(</span><span 
class="n">appName</span><span class="o">).</span><span 
class="n">setMaster</span><span class="o">(</span><span 
class="n">master</span><span class="o">)</span>
-<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> 
<span class="k">new</span> <span class="nc">StreamingContext</span><span 
class="o">(</span><span class="n">conf</span><span class="o">,</span> <span 
class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span></code></pre></div>
+<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> 
<span class="k">new</span> <span class="nc">StreamingContext</span><span 
class="o">(</span><span class="n">conf</span><span class="o">,</span> <span 
class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span></code></pre></figure>
 
     <p>The <code>appName</code> parameter is a name for your application to 
show on the cluster UI.
 <code>master</code> is a <a 
href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster 
URL</a>,
@@ -566,21 +566,21 @@ section for more details.</p>
 
     <p>A <code>StreamingContext</code> object can also be created from an 
existing <code>SparkContext</code> object.</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">import</span> <span 
class="nn">org.apache.spark.streaming._</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">import</span> <span 
class="nn">org.apache.spark.streaming._</span>
 
 <span class="k">val</span> <span class="n">sc</span> <span class="k">=</span> 
<span class="o">...</span>                <span class="c1">// existing 
SparkContext</span>
-<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> 
<span class="k">new</span> <span class="nc">StreamingContext</span><span 
class="o">(</span><span class="n">sc</span><span class="o">,</span> <span 
class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span></code></pre></div>
+<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> 
<span class="k">new</span> <span class="nc">StreamingContext</span><span 
class="o">(</span><span class="n">sc</span><span class="o">,</span> <span 
class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
     <p>A <a 
href="api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html">JavaStreamingContext</a>
 object can be created from a <a 
href="api/java/index.html?org/apache/spark/SparkConf.html">SparkConf</a> 
object.</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kn">import</span> <span 
class="nn">org.apache.spark.*</span><span class="o">;</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kn">import</span> <span 
class="nn">org.apache.spark.*</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
 
-<span class="n">SparkConf</span> <span class="n">conf</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="nf">SparkConf</span><span class="o">().</span><span 
class="na">setAppName</span><span class="o">(</span><span 
class="n">appName</span><span class="o">).</span><span 
class="na">setMaster</span><span class="o">(</span><span 
class="n">master</span><span class="o">);</span>
-<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="nf">JavaStreamingContext</span><span class="o">(</span><span 
class="n">conf</span><span class="o">,</span> <span class="k">new</span> <span 
class="nf">Duration</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span></code></pre></div>
+<span class="n">SparkConf</span> <span class="n">conf</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">SparkConf</span><span class="o">().</span><span 
class="na">setAppName</span><span class="o">(</span><span 
class="n">appName</span><span class="o">).</span><span 
class="na">setMaster</span><span class="o">(</span><span 
class="n">master</span><span class="o">);</span>
+<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">JavaStreamingContext</span><span class="o">(</span><span 
class="n">conf</span><span class="o">,</span> <span class="k">new</span> <span 
class="n">Duration</span><span class="o">(</span><span 
class="mi">1000</span><span class="o">));</span></code></pre></figure>
 
     <p>The <code>appName</code> parameter is a name for your application to 
show on the cluster UI.
 <code>master</code> is a <a 
href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster 
URL</a>,
@@ -596,21 +596,21 @@ section for more details.</p>
 
     <p>A <code>JavaStreamingContext</code> object can also be created from an 
existing <code>JavaSparkContext</code>.</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
 
 <span class="n">JavaSparkContext</span> <span class="n">sc</span> <span 
class="o">=</span> <span class="o">...</span>   <span class="c1">//existing 
JavaSparkContext</span>
-<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="nf">JavaStreamingContext</span><span class="o">(</span><span 
class="n">sc</span><span class="o">,</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">));</span></code></pre></div>
+<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="n">JavaStreamingContext</span><span class="o">(</span><span 
class="n">sc</span><span class="o">,</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span 
class="o">));</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
     <p>A <a 
href="api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext">StreamingContext</a>
 object can be created from a <a 
href="api/python/pyspark.html#pyspark.SparkContext">SparkContext</a> object.</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="kn">from</span> <span class="nn">pyspark</span> 
<span class="kn">import</span> <span class="n">SparkContext</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="kn">from</span> <span 
class="nn">pyspark</span> <span class="kn">import</span> <span 
class="n">SparkContext</span>
 <span class="kn">from</span> <span class="nn">pyspark.streaming</span> <span 
class="kn">import</span> <span class="n">StreamingContext</span>
 
 <span class="n">sc</span> <span class="o">=</span> <span 
class="n">SparkContext</span><span class="p">(</span><span 
class="n">master</span><span class="p">,</span> <span 
class="n">appName</span><span class="p">)</span>
-<span class="n">ssc</span> <span class="o">=</span> <span 
class="n">StreamingContext</span><span class="p">(</span><span 
class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span 
class="p">)</span></code></pre></div>
+<span class="n">ssc</span> <span class="o">=</span> <span 
class="n">StreamingContext</span><span class="p">(</span><span 
class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span 
class="p">)</span></code></pre></figure>
 
     <p>The <code>appName</code> parameter is a name for your application to 
show on the cluster UI.
 <code>master</code> is a <a 
href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster 
URL</a>,
@@ -931,15 +931,15 @@ define the update function as:</p>
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">def</span> <span 
class="n">updateFunction</span><span class="o">(</span><span 
class="n">newValues</span><span class="k">:</span> <span 
class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span 
class="o">],</span> <span class="n">runningCount</span><span class="k">:</span> 
<span class="kt">Option</span><span class="o">[</span><span 
class="kt">Int</span><span class="o">])</span><span class="k">:</span> <span 
class="kt">Option</span><span class="o">[</span><span 
class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span 
class="o">{</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">def</span> <span 
class="n">updateFunction</span><span class="o">(</span><span 
class="n">newValues</span><span class="k">:</span> <span 
class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span 
class="o">],</span> <span class="n">runningCount</span><span class="k">:</span> 
<span class="kt">Option</span><span class="o">[</span><span 
class="kt">Int</span><span class="o">])</span><span class="k">:</span> <span 
class="kt">Option</span><span class="o">[</span><span 
class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span 
class="o">{</span>
     <span class="k">val</span> <span class="n">newCount</span> <span 
class="k">=</span> <span class="o">...</span>  <span class="c1">// add the new 
values with the previous running count to get the new count</span>
     <span class="nc">Some</span><span class="o">(</span><span 
class="n">newCount</span><span class="o">)</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
     <p>This is applied on a DStream containing words (say, the 
<code>pairs</code> DStream containing <code>(word,
 1)</code> pairs in the <a href="#a-quick-example">earlier example</a>).</p>
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span 
class="n">runningCounts</span> <span class="k">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="n">updateStateByKey</span><span class="o">[</span><span 
class="kt">Int</span><span class="o">](</span><span 
class="n">updateFunction</span> <span class="k">_</span><span 
class="o">)</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">val</span> <span 
class="n">runningCounts</span> <span class="k">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="n">updateStateByKey</span><span class="o">[</span><span 
class="kt">Int</span><span class="o">](</span><span 
class="n">updateFunction</span> <span class="k">_</span><span 
class="o">)</span></code></pre></figure>
 
     <p>The update function will be called for each word, with 
<code>newValues</code> having a sequence of 1&#8217;s (from
 the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the 
previous count.</p>
@@ -947,18 +947,18 @@ the <code>(word, 1)</code> pairs) and the 
<code>runningCount</code> having the p
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">Function2</span><span 
class="o">&lt;</span><span class="n">List</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;,</span> 
<span class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;,</span> <span 
class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">updateFunction</span> <span class="o">=</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">Function2</span><span 
class="o">&lt;</span><span class="n">List</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;,</span> 
<span class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;,</span> <span 
class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">updateFunction</span> <span class="o">=</span>
   <span class="k">new</span> <span class="n">Function2</span><span 
class="o">&lt;</span><span class="n">List</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;,</span> 
<span class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;,</span> <span 
class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;&gt;()</span> <span 
class="o">{</span>
     <span class="nd">@Override</span> <span class="kd">public</span> <span 
class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="nf">call</span><span class="o">(</span><span class="n">List</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> 
<span class="n">values</span><span class="o">,</span> <span 
class="n">Optional</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">state</span><span class="o">)</span> <span class="o">{</span>
       <span class="n">Integer</span> <span class="n">newSum</span> <span 
class="o">=</span> <span class="o">...</span>  <span class="c1">// add the new 
values with the previous running count to get the new count</span>
       <span class="k">return</span> <span class="n">Optional</span><span 
class="o">.</span><span class="na">of</span><span class="o">(</span><span 
class="n">newSum</span><span class="o">);</span>
     <span class="o">}</span>
-  <span class="o">};</span></code></pre></div>
+  <span class="o">};</span></code></pre></figure>
 
     <p>This is applied on a DStream containing words (say, the 
<code>pairs</code> DStream containing <code>(word,
 1)</code> pairs in the <a href="#a-quick-example">quick example</a>).</p>
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">JavaPairDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">runningCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="na">updateStateByKey</span><span class="o">(</span><span 
class="n">updateFunction</span><span class="o">);</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">JavaPairDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">runningCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="na">updateStateByKey</span><span class="o">(</span><span 
class="n">updateFunction</span><span class="o">);</span></code></pre></figure>
 
     <p>The update function will be called for each word, with 
<code>newValues</code> having a sequence of 1&#8217;s (from
 the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the 
previous count. For the complete
@@ -969,15 +969,15 @@ Java code, take a look at the example
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="k">def</span> <span 
class="nf">updateFunction</span><span class="p">(</span><span 
class="n">newValues</span><span class="p">,</span> <span 
class="n">runningCount</span><span class="p">):</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="k">def</span> <span 
class="nf">updateFunction</span><span class="p">(</span><span 
class="n">newValues</span><span class="p">,</span> <span 
class="n">runningCount</span><span class="p">):</span>
     <span class="k">if</span> <span class="n">runningCount</span> <span 
class="ow">is</span> <span class="bp">None</span><span class="p">:</span>
         <span class="n">runningCount</span> <span class="o">=</span> <span 
class="mi">0</span>
-    <span class="k">return</span> <span class="nb">sum</span><span 
class="p">(</span><span class="n">newValues</span><span class="p">,</span> 
<span class="n">runningCount</span><span class="p">)</span>  <span class="c"># 
add the new values with the previous running count to get the new 
count</span></code></pre></div>
+    <span class="k">return</span> <span class="nb">sum</span><span 
class="p">(</span><span class="n">newValues</span><span class="p">,</span> 
<span class="n">runningCount</span><span class="p">)</span>  <span class="c1"># 
add the new values with the previous running count to get the new 
count</span></code></pre></figure>
 
     <p>This is applied on a DStream containing words (say, the 
<code>pairs</code> DStream containing <code>(word,
 1)</code> pairs in the <a href="#a-quick-example">earlier example</a>).</p>
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="n">runningCounts</span> <span 
class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span 
class="n">updateStateByKey</span><span class="p">(</span><span 
class="n">updateFunction</span><span class="p">)</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="n">runningCounts</span> <span 
class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span 
class="n">updateStateByKey</span><span class="p">(</span><span 
class="n">updateFunction</span><span class="p">)</span></code></pre></figure>
 
     <p>The update function will be called for each word, with 
<code>newValues</code> having a sequence of 1&#8217;s (from
 the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the 
previous count. For the complete
@@ -1003,17 +1003,17 @@ spam information (maybe generated with Spark as well) 
and then filtering based o
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span class="n">spamInfoRDD</span> 
<span class="k">=</span> <span class="n">ssc</span><span 
class="o">.</span><span class="n">sparkContext</span><span 
class="o">.</span><span class="n">newAPIHadoopRDD</span><span 
class="o">(...)</span> <span class="c1">// RDD containing spam 
information</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">val</span> <span 
class="n">spamInfoRDD</span> <span class="k">=</span> <span 
class="n">ssc</span><span class="o">.</span><span 
class="n">sparkContext</span><span class="o">.</span><span 
class="n">newAPIHadoopRDD</span><span class="o">(...)</span> <span 
class="c1">// RDD containing spam information</span>
 
 <span class="k">val</span> <span class="n">cleanedDStream</span> <span 
class="k">=</span> <span class="n">wordCounts</span><span 
class="o">.</span><span class="n">transform</span> <span class="o">{</span> 
<span class="n">rdd</span> <span class="k">=&gt;</span>
   <span class="n">rdd</span><span class="o">.</span><span 
class="n">join</span><span class="o">(</span><span 
class="n">spamInfoRDD</span><span class="o">).</span><span 
class="n">filter</span><span class="o">(...)</span> <span class="c1">// join 
data stream with spam information to do data cleaning</span>
   <span class="o">...</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kn">import</span> <span 
class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
 <span class="c1">// RDD containing spam information</span>
 <span class="kd">final</span> <span class="n">JavaPairRDD</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Double</span><span class="o">&gt;</span> <span 
class="n">spamInfoRDD</span> <span class="o">=</span> <span 
class="n">jssc</span><span class="o">.</span><span 
class="na">sparkContext</span><span class="o">().</span><span 
class="na">newAPIHadoopRDD</span><span class="o">(...);</span>
 
@@ -1023,15 +1023,15 @@ spam information (maybe generated with Spark as well) 
and then filtering based o
       <span class="n">rdd</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span><span 
class="n">spamInfoRDD</span><span class="o">).</span><span 
class="na">filter</span><span class="o">(...);</span> <span class="c1">// join 
data stream with spam information to do data cleaning</span>
       <span class="o">...</span>
     <span class="o">}</span>
-  <span class="o">});</span></code></pre></div>
+  <span class="o">});</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="n">spamInfoRDD</span> <span class="o">=</span> 
<span class="n">sc</span><span class="o">.</span><span 
class="n">pickleFile</span><span class="p">(</span><span 
class="o">...</span><span class="p">)</span>  <span class="c"># RDD containing 
spam information</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="n">spamInfoRDD</span> <span 
class="o">=</span> <span class="n">sc</span><span class="o">.</span><span 
class="n">pickleFile</span><span class="p">(</span><span 
class="o">...</span><span class="p">)</span>  <span class="c1"># RDD containing 
spam information</span>
 
-<span class="c"># join data stream with spam information to do data 
cleaning</span>
-<span class="n">cleanedDStream</span> <span class="o">=</span> <span 
class="n">wordCounts</span><span class="o">.</span><span 
class="n">transform</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">rdd</span><span class="p">:</span> <span 
class="n">rdd</span><span class="o">.</span><span class="n">join</span><span 
class="p">(</span><span class="n">spamInfoRDD</span><span 
class="p">)</span><span class="o">.</span><span class="n">filter</span><span 
class="p">(</span><span class="o">...</span><span 
class="p">))</span></code></pre></div>
+<span class="c1"># join data stream with spam information to do data 
cleaning</span>
+<span class="n">cleanedDStream</span> <span class="o">=</span> <span 
class="n">wordCounts</span><span class="o">.</span><span 
class="n">transform</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">rdd</span><span class="p">:</span> <span 
class="n">rdd</span><span class="o">.</span><span class="n">join</span><span 
class="p">(</span><span class="n">spamInfoRDD</span><span 
class="p">)</span><span class="o">.</span><span class="n">filter</span><span 
class="p">(</span><span class="o">...</span><span 
class="p">))</span></code></pre></figure>
 
   </div>
 </div>
@@ -1073,13 +1073,13 @@ operation <code>reduceByKeyAndWindow</code>.</p>
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">// Reduce last 30 seconds of data, every 10 
seconds</span>
-<span class="k">val</span> <span class="n">windowedWordCounts</span> <span 
class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span 
class="n">reduceByKeyAndWindow</span><span class="o">((</span><span 
class="n">a</span><span class="k">:</span><span class="kt">Int</span><span 
class="o">,</span><span class="n">b</span><span class="k">:</span><span 
class="kt">Int</span><span class="o">)</span> <span class="k">=&gt;</span> 
<span class="o">(</span><span class="n">a</span> <span class="o">+</span> <span 
class="n">b</span><span class="o">),</span> <span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">))</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="c1">// Reduce last 30 seconds of 
data, every 10 seconds</span>
+<span class="k">val</span> <span class="n">windowedWordCounts</span> <span 
class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span 
class="n">reduceByKeyAndWindow</span><span class="o">((</span><span 
class="n">a</span><span class="k">:</span><span class="kt">Int</span><span 
class="o">,</span><span class="n">b</span><span class="k">:</span><span 
class="kt">Int</span><span class="o">)</span> <span class="k">=&gt;</span> 
<span class="o">(</span><span class="n">a</span> <span class="o">+</span> <span 
class="n">b</span><span class="o">),</span> <span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">))</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Reduce function adding two integers, 
defined separately for clarity</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Reduce function adding two 
integers, defined separately for clarity</span>
 <span class="n">Function2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">reduceFunc</span> <span class="o">=</span> <span class="k">new</span> 
<span class="n">Function2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
   <span class="nd">@Override</span> <span class="kd">public</span> <span 
class="n">Integer</span> <span class="nf">call</span><span 
class="o">(</span><span class="n">Integer</span> <span class="n">i1</span><span 
class="o">,</span> <span class="n">Integer</span> <span 
class="n">i2</span><span class="o">)</span> <span class="o">{</span>
     <span class="k">return</span> <span class="n">i1</span> <span 
class="o">+</span> <span class="n">i2</span><span class="o">;</span>
@@ -1087,13 +1087,13 @@ operation <code>reduceByKeyAndWindow</code>.</p>
 <span class="o">};</span>
 
 <span class="c1">// Reduce last 30 seconds of data, every 10 seconds</span>
-<span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">windowedWordCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="na">reduceByKeyAndWindow</span><span class="o">(</span><span 
class="n">reduceFunc</span><span class="o">,</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">));</span></code></pre></div>
+<span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span 
class="n">windowedWordCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="na">reduceByKeyAndWindow</span><span class="o">(</span><span 
class="n">reduceFunc</span><span class="o">,</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">30</span><span class="o">),</span> <span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">10</span><span class="o">));</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="c"># Reduce last 30 seconds of data, every 10 
seconds</span>
-<span class="n">windowedWordCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="n">reduceByKeyAndWindow</span><span class="p">(</span><span 
class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span 
class="n">y</span><span class="p">:</span> <span class="n">x</span> <span 
class="o">+</span> <span class="n">y</span><span class="p">,</span> <span 
class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span 
class="n">y</span><span class="p">:</span> <span class="n">x</span> <span 
class="o">-</span> <span class="n">y</span><span class="p">,</span> <span 
class="mi">30</span><span class="p">,</span> <span class="mi">10</span><span 
class="p">)</span></code></pre></div>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="c1"># Reduce last 30 seconds of 
data, every 10 seconds</span>
+<span class="n">windowedWordCounts</span> <span class="o">=</span> <span 
class="n">pairs</span><span class="o">.</span><span 
class="n">reduceByKeyAndWindow</span><span class="p">(</span><span 
class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span 
class="n">y</span><span class="p">:</span> <span class="n">x</span> <span 
class="o">+</span> <span class="n">y</span><span class="p">,</span> <span 
class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span 
class="n">y</span><span class="p">:</span> <span class="n">x</span> <span 
class="o">-</span> <span class="n">y</span><span class="p">,</span> <span 
class="mi">30</span><span class="p">,</span> <span class="mi">10</span><span 
class="p">)</span></code></pre></figure>
 
   </div>
 </div>
@@ -1167,48 +1167,48 @@ said two parameters - <i>windowLength</i> and 
<i>slideInterval</i>.</p>
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span 
class="n">stream1</span><span class="k">:</span> <span 
class="kt">DStream</span><span class="o">[</span><span 
class="kt">String</span>, <span class="kt">String</span><span 
class="o">]</span> <span class="k">=</span> <span class="o">...</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">val</span> <span 
class="n">stream1</span><span class="k">:</span> <span 
class="kt">DStream</span><span class="o">[</span><span 
class="kt">String</span>, <span class="kt">String</span><span 
class="o">]</span> <span class="k">=</span> <span class="o">...</span>
 <span class="k">val</span> <span class="n">stream2</span><span 
class="k">:</span> <span class="kt">DStream</span><span class="o">[</span><span 
class="kt">String</span>, <span class="kt">String</span><span 
class="o">]</span> <span class="k">=</span> <span class="o">...</span>
-<span class="k">val</span> <span class="n">joinedStream</span> <span 
class="k">=</span> <span class="n">stream1</span><span class="o">.</span><span 
class="n">join</span><span class="o">(</span><span 
class="n">stream2</span><span class="o">)</span></code></pre></div>
+<span class="k">val</span> <span class="n">joinedStream</span> <span 
class="k">=</span> <span class="n">stream1</span><span class="o">.</span><span 
class="n">join</span><span class="o">(</span><span 
class="n">stream2</span><span class="o">)</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">JavaPairDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream1</span> <span class="o">=</span> <span class="o">...</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">JavaPairDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream1</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">stream2</span> <span class="o">=</span> <span class="o">...</span>
-<span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span><span 
class="n">stream2</span><span class="o">);</span></code></pre></div>
+<span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span><span 
class="n">stream2</span><span class="o">);</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="n">stream1</span> <span class="o">=</span> 
<span class="o">...</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="n">stream1</span> <span 
class="o">=</span> <span class="o">...</span>
 <span class="n">stream2</span> <span class="o">=</span> <span 
class="o">...</span>
-<span class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="n">join</span><span class="p">(</span><span 
class="n">stream2</span><span class="p">)</span></code></pre></div>
+<span class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="n">join</span><span class="p">(</span><span 
class="n">stream2</span><span class="p">)</span></code></pre></figure>
 
   </div>
 </div>
-<p>Here, in each batch interval, the RDD generated by <code>stream1</code> 
will be joined with the RDD generated by <code>stream2</code>. You can also do 
<code>leftOuterJoin</code>, <code>rightOuterJoin</code>, 
<code>fullOuterJoin</code>. Furthermore, it is often very useful to do joins 
over windows of the streams. That is pretty easy as well.</p>
+<p>Here, in each batch interval, the RDD generated by <code>stream1</code> 
will be joined with the RDD generated by <code>stream2</code>. You can also do 
<code>leftOuterJoin</code>, <code>rightOuterJoin</code>, 
<code>fullOuterJoin</code>. Furthermore, it is often very useful to do joins 
over windows of the streams. That is pretty easy as well. </p>
 
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span 
class="n">windowedStream1</span> <span class="k">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="n">window</span><span class="o">(</span><span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">20</span><span class="o">))</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">val</span> <span 
class="n">windowedStream1</span> <span class="k">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="n">window</span><span class="o">(</span><span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">20</span><span class="o">))</span>
 <span class="k">val</span> <span class="n">windowedStream2</span> <span 
class="k">=</span> <span class="n">stream2</span><span class="o">.</span><span 
class="n">window</span><span class="o">(</span><span 
class="nc">Minutes</span><span class="o">(</span><span class="mi">1</span><span 
class="o">))</span>
-<span class="k">val</span> <span class="n">joinedStream</span> <span 
class="k">=</span> <span class="n">windowedStream1</span><span 
class="o">.</span><span class="n">join</span><span class="o">(</span><span 
class="n">windowedStream2</span><span class="o">)</span></code></pre></div>
+<span class="k">val</span> <span class="n">joinedStream</span> <span 
class="k">=</span> <span class="n">windowedStream1</span><span 
class="o">.</span><span class="n">join</span><span class="o">(</span><span 
class="n">windowedStream2</span><span class="o">)</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">JavaPairDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">windowedStream1</span> <span class="o">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">20</span><span class="o">));</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">JavaPairDStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">windowedStream1</span> <span class="o">=</span> <span 
class="n">stream1</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">20</span><span class="o">));</span>
 <span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">windowedStream2</span> <span class="o">=</span> <span 
class="n">stream2</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Durations</span><span class="o">.</span><span 
class="na">minutes</span><span class="o">(</span><span class="mi">1</span><span 
class="o">));</span>
-<span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream1</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span><span 
class="n">windowedStream2</span><span class="o">);</span></code></pre></div>
+<span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream1</span><span class="o">.</span><span 
class="na">join</span><span class="o">(</span><span 
class="n">windowedStream2</span><span class="o">);</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="n">windowedStream1</span> <span 
class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span 
class="n">window</span><span class="p">(</span><span class="mi">20</span><span 
class="p">)</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="n">windowedStream1</span> <span 
class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span 
class="n">window</span><span class="p">(</span><span class="mi">20</span><span 
class="p">)</span>
 <span class="n">windowedStream2</span> <span class="o">=</span> <span 
class="n">stream2</span><span class="o">.</span><span 
class="n">window</span><span class="p">(</span><span class="mi">60</span><span 
class="p">)</span>
-<span class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream1</span><span class="o">.</span><span 
class="n">join</span><span class="p">(</span><span 
class="n">windowedStream2</span><span class="p">)</span></code></pre></div>
+<span class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream1</span><span class="o">.</span><span 
class="n">join</span><span class="p">(</span><span 
class="n">windowedStream2</span><span class="p">)</span></code></pre></figure>
 
   </div>
 </div>
@@ -1219,14 +1219,14 @@ said two parameters - <i>windowLength</i> and 
<i>slideInterval</i>.</p>
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span 
class="n">dataset</span><span class="k">:</span> <span 
class="kt">RDD</span><span class="o">[</span><span class="kt">String</span>, 
<span class="kt">String</span><span class="o">]</span> <span class="k">=</span> 
<span class="o">...</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">val</span> <span 
class="n">dataset</span><span class="k">:</span> <span 
class="kt">RDD</span><span class="o">[</span><span class="kt">String</span>, 
<span class="kt">String</span><span class="o">]</span> <span class="k">=</span> 
<span class="o">...</span>
 <span class="k">val</span> <span class="n">windowedStream</span> <span 
class="k">=</span> <span class="n">stream</span><span class="o">.</span><span 
class="n">window</span><span class="o">(</span><span 
class="nc">Seconds</span><span class="o">(</span><span 
class="mi">20</span><span class="o">))...</span>
-<span class="k">val</span> <span class="n">joinedStream</span> <span 
class="k">=</span> <span class="n">windowedStream</span><span 
class="o">.</span><span class="n">transform</span> <span class="o">{</span> 
<span class="n">rdd</span> <span class="k">=&gt;</span> <span 
class="n">rdd</span><span class="o">.</span><span class="n">join</span><span 
class="o">(</span><span class="n">dataset</span><span class="o">)</span> <span 
class="o">}</span></code></pre></div>
+<span class="k">val</span> <span class="n">joinedStream</span> <span 
class="k">=</span> <span class="n">windowedStream</span><span 
class="o">.</span><span class="n">transform</span> <span class="o">{</span> 
<span class="n">rdd</span> <span class="k">=&gt;</span> <span 
class="n">rdd</span><span class="o">.</span><span class="n">join</span><span 
class="o">(</span><span class="n">dataset</span><span class="o">)</span> <span 
class="o">}</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">JavaPairRDD</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">dataset</span> <span class="o">=</span> <span class="o">...</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">JavaPairRDD</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="n">dataset</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="na">window</span><span class="o">(</span><span 
class="n">Durations</span><span class="o">.</span><span 
class="na">seconds</span><span class="o">(</span><span 
class="mi">20</span><span class="o">));</span>
 <span class="n">JavaPairDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream</span><span class="o">.</span><span 
class="na">transform</span><span class="o">(</span>
   <span class="k">new</span> <span class="n">Function</span><span 
class="o">&lt;</span><span class="n">JavaRDD</span><span 
class="o">&lt;</span><span class="n">Tuple2</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;&gt;,</span> <span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;&gt;()</span> <span 
class="o">{</span>
@@ -1235,14 +1235,14 @@ said two parameters - <i>windowLength</i> and 
<i>slideInterval</i>.</p>
       <span class="k">return</span> <span class="n">rdd</span><span 
class="o">.</span><span class="na">join</span><span class="o">(</span><span 
class="n">dataset</span><span class="o">);</span>
     <span class="o">}</span>
   <span class="o">}</span>
-<span class="o">);</span></code></pre></div>
+<span class="o">);</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="n">dataset</span> <span class="o">=</span> 
<span class="o">...</span> <span class="c"># some RDD</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="n">dataset</span> <span 
class="o">=</span> <span class="o">...</span> <span class="c1"># some RDD</span>
 <span class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">stream</span><span class="o">.</span><span 
class="n">window</span><span class="p">(</span><span class="mi">20</span><span 
class="p">)</span>
-<span class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream</span><span class="o">.</span><span 
class="n">transform</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">rdd</span><span class="p">:</span> <span 
class="n">rdd</span><span class="o">.</span><span class="n">join</span><span 
class="p">(</span><span class="n">dataset</span><span 
class="p">))</span></code></pre></div>
+<span class="n">joinedStream</span> <span class="o">=</span> <span 
class="n">windowedStream</span><span class="o">.</span><span 
class="n">transform</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">rdd</span><span class="p">:</span> <span 
class="n">rdd</span><span class="o">.</span><span class="n">join</span><span 
class="p">(</span><span class="n">dataset</span><span 
class="p">))</span></code></pre></figure>
 
   </div>
 </div>
@@ -1324,22 +1324,22 @@ For example (in Scala),</p>
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">dstream</span><span class="o">.</span><span 
class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> 
<span class="k">=&gt;</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="n">dstream</span><span 
class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> 
<span class="n">rdd</span> <span class="k">=&gt;</span>
   <span class="k">val</span> <span class="n">connection</span> <span 
class="k">=</span> <span class="n">createNewConnection</span><span 
class="o">()</span>  <span class="c1">// executed at the driver</span>
   <span class="n">rdd</span><span class="o">.</span><span 
class="n">foreach</span> <span class="o">{</span> <span class="n">record</span> 
<span class="k">=&gt;</span>
     <span class="n">connection</span><span class="o">.</span><span 
class="n">send</span><span class="o">(</span><span class="n">record</span><span 
class="o">)</span> <span class="c1">// executed at the worker</span>
   <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
   </div>
 <div data-lang="python">
 
-    <div class="highlight"><pre><code class="language-python" 
data-lang="python"><span class="k">def</span> <span 
class="nf">sendRecord</span><span class="p">(</span><span 
class="n">rdd</span><span class="p">):</span>
-    <span class="n">connection</span> <span class="o">=</span> <span 
class="n">createNewConnection</span><span class="p">()</span>  <span 
class="c"># executed at the driver</span>
+    <figure class="highlight"><pre><code class="language-python" 
data-lang="python"><span></span><span class="k">def</span> <span 
class="nf">sendRecord</span><span class="p">(</span><span 
class="n">rdd</span><span class="p">):</span>
+    <span class="n">connection</span> <span class="o">=</span> <span 
class="n">createNewConnection</span><span class="p">()</span>  <span 
class="c1"># executed at the driver</span>
     <span class="n">rdd</span><span class="o">.</span><span 
class="n">foreach</span><span class="p">(</span><span class="k">lambda</span> 
<span class="n">record</span><span class="p">:</span> <span 
class="n">connection</span><span class="o">.</span><span 
class="n">send</span><span class="p">(</span><span class="n">record</span><span 
class="p">))</span>
     <span class="n">connection</span><span class="o">.</span><span 
class="n">close</span><span class="p">()</span>
 
-<span class="n">dstream</span><span class="o">.</span><span 
class="n">foreachRDD</span><span class="p">(</span><span 
class="n">sendRecord</span><span class="p">)</span></code></pre></div>
+<span class="n">dstream</span><span class="

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to