http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/storage-openstack-swift.html
----------------------------------------------------------------------
diff --git a/site/docs/2.1.0/storage-openstack-swift.html 
b/site/docs/2.1.0/storage-openstack-swift.html
index bbb3446..a20c67f 100644
--- a/site/docs/2.1.0/storage-openstack-swift.html
+++ b/site/docs/2.1.0/storage-openstack-swift.html
@@ -144,7 +144,7 @@ Current Swift driver requires Swift to use Keystone 
authentication method.</p>
 <p>The Spark application should include <code>hadoop-openstack</code> 
dependency.
 For example, for Maven support, add the following to the <code>pom.xml</code> 
file:</p>
 
-<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span 
class="nt">&lt;dependencyManagement&gt;</span>
+<figure class="highlight"><pre><code class="language-xml" 
data-lang="xml"><span></span><span 
class="nt">&lt;dependencyManagement&gt;</span>
   ...
   <span class="nt">&lt;dependency&gt;</span>
     <span class="nt">&lt;groupId&gt;</span>org.apache.hadoop<span 
class="nt">&lt;/groupId&gt;</span>
@@ -152,15 +152,15 @@ For example, for Maven support, add the following to the 
<code>pom.xml</code> fi
     <span class="nt">&lt;version&gt;</span>2.3.0<span 
class="nt">&lt;/version&gt;</span>
   <span class="nt">&lt;/dependency&gt;</span>
   ...
-<span class="nt">&lt;/dependencyManagement&gt;</span></code></pre></div>
+<span class="nt">&lt;/dependencyManagement&gt;</span></code></pre></figure>
 
 <h1 id="configuration-parameters">Configuration Parameters</h1>
 
 <p>Create <code>core-site.xml</code> and place it inside Spark&#8217;s 
<code>conf</code> directory.
 There are two main categories of parameters that should to be configured: 
declaration of the
-Swift driver and the parameters that are required by Keystone.</p>
+Swift driver and the parameters that are required by Keystone. </p>
 
-<p>Configuration of Hadoop to use Swift File system achieved via</p>
+<p>Configuration of Hadoop to use Swift File system achieved via </p>
 
 <table class="table">
 <tr><th>Property Name</th><th>Value</th></tr>
@@ -221,7 +221,7 @@ contains a list of Keystone mandatory parameters. 
<code>PROVIDER</code> can be a
 <p>For example, assume <code>PROVIDER=SparkTest</code> and Keystone contains 
user <code>tester</code> with password <code>testing</code>
 defined for tenant <code>test</code>. Then <code>core-site.xml</code> should 
include:</p>
 
-<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span 
class="nt">&lt;configuration&gt;</span>
+<figure class="highlight"><pre><code class="language-xml" 
data-lang="xml"><span></span><span class="nt">&lt;configuration&gt;</span>
   <span class="nt">&lt;property&gt;</span>
     <span class="nt">&lt;name&gt;</span>fs.swift.impl<span 
class="nt">&lt;/name&gt;</span>
     <span 
class="nt">&lt;value&gt;</span>org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem<span
 class="nt">&lt;/value&gt;</span>
@@ -257,7 +257,7 @@ defined for tenant <code>test</code>. Then 
<code>core-site.xml</code> should inc
     <span 
class="nt">&lt;name&gt;</span>fs.swift.service.SparkTest.password<span 
class="nt">&lt;/name&gt;</span>
     <span class="nt">&lt;value&gt;</span>testing<span 
class="nt">&lt;/value&gt;</span>
   <span class="nt">&lt;/property&gt;</span>
-<span class="nt">&lt;/configuration&gt;</span></code></pre></div>
+<span class="nt">&lt;/configuration&gt;</span></code></pre></figure>
 
 <p>Notice that
 <code>fs.swift.service.PROVIDER.tenant</code>,

http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-custom-receivers.html
----------------------------------------------------------------------
diff --git a/site/docs/2.1.0/streaming-custom-receivers.html 
b/site/docs/2.1.0/streaming-custom-receivers.html
index d31647d..846c797 100644
--- a/site/docs/2.1.0/streaming-custom-receivers.html
+++ b/site/docs/2.1.0/streaming-custom-receivers.html
@@ -171,7 +171,7 @@ has any error connecting or receiving, the receiver is 
restarted to make another
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">class</span> <span 
class="nc">CustomReceiver</span><span class="o">(</span><span 
class="n">host</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">,</span> <span 
class="n">port</span><span class="k">:</span> <span class="kt">Int</span><span 
class="o">)</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">class</span> <span 
class="nc">CustomReceiver</span><span class="o">(</span><span 
class="n">host</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">,</span> <span 
class="n">port</span><span class="k">:</span> <span class="kt">Int</span><span 
class="o">)</span>
   <span class="k">extends</span> <span class="nc">Receiver</span><span 
class="o">[</span><span class="kt">String</span><span class="o">](</span><span 
class="nc">StorageLevel</span><span class="o">.</span><span 
class="nc">MEMORY_AND_DISK_2</span><span class="o">)</span> <span 
class="k">with</span> <span class="nc">Logging</span> <span class="o">{</span>
 
   <span class="k">def</span> <span class="n">onStart</span><span 
class="o">()</span> <span class="o">{</span>
@@ -216,12 +216,12 @@ has any error connecting or receiving, the receiver is 
restarted to make another
         <span class="n">restart</span><span class="o">(</span><span 
class="s">&quot;Error receiving data&quot;</span><span class="o">,</span> <span 
class="n">t</span><span class="o">)</span>
     <span class="o">}</span>
   <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kd">public</span> <span class="kd">class</span> 
<span class="nc">JavaCustomReceiver</span> <span class="kd">extends</span> 
<span class="n">Receiver</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">JavaCustomReceiver</span> <span 
class="kd">extends</span> <span class="n">Receiver</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="o">{</span>
 
   <span class="n">String</span> <span class="n">host</span> <span 
class="o">=</span> <span class="kc">null</span><span class="o">;</span>
   <span class="kt">int</span> <span class="n">port</span> <span 
class="o">=</span> <span class="o">-</span><span class="mi">1</span><span 
class="o">;</span>
@@ -234,7 +234,7 @@ has any error connecting or receiving, the receiver is 
restarted to make another
 
   <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">onStart</span><span class="o">()</span> <span class="o">{</span>
     <span class="c1">// Start the thread that receives data over a 
connection</span>
-    <span class="k">new</span> <span class="nf">Thread</span><span 
class="o">()</span>  <span class="o">{</span>
+    <span class="k">new</span> <span class="n">Thread</span><span 
class="o">()</span>  <span class="o">{</span>
       <span class="nd">@Override</span> <span class="kd">public</span> <span 
class="kt">void</span> <span class="nf">run</span><span class="o">()</span> 
<span class="o">{</span>
         <span class="n">receive</span><span class="o">();</span>
       <span class="o">}</span>
@@ -253,10 +253,10 @@ has any error connecting or receiving, the receiver is 
restarted to make another
 
     <span class="k">try</span> <span class="o">{</span>
       <span class="c1">// connect to the server</span>
-      <span class="n">socket</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nf">Socket</span><span 
class="o">(</span><span class="n">host</span><span class="o">,</span> <span 
class="n">port</span><span class="o">);</span>
+      <span class="n">socket</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Socket</span><span class="o">(</span><span 
class="n">host</span><span class="o">,</span> <span class="n">port</span><span 
class="o">);</span>
 
-      <span class="n">BufferedReader</span> <span class="n">reader</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="nf">BufferedReader</span><span class="o">(</span>
-        <span class="k">new</span> <span 
class="nf">InputStreamReader</span><span class="o">(</span><span 
class="n">socket</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(),</span> <span 
class="n">StandardCharsets</span><span class="o">.</span><span 
class="na">UTF_8</span><span class="o">));</span>
+      <span class="n">BufferedReader</span> <span class="n">reader</span> 
<span class="o">=</span> <span class="k">new</span> <span 
class="n">BufferedReader</span><span class="o">(</span>
+        <span class="k">new</span> <span 
class="n">InputStreamReader</span><span class="o">(</span><span 
class="n">socket</span><span class="o">.</span><span 
class="na">getInputStream</span><span class="o">(),</span> <span 
class="n">StandardCharsets</span><span class="o">.</span><span 
class="na">UTF_8</span><span class="o">));</span>
 
       <span class="c1">// Until stopped or connection broken continue 
reading</span>
       <span class="k">while</span> <span class="o">(!</span><span 
class="n">isStopped</span><span class="o">()</span> <span 
class="o">&amp;&amp;</span> <span class="o">(</span><span 
class="n">userInput</span> <span class="o">=</span> <span 
class="n">reader</span><span class="o">.</span><span 
class="na">readLine</span><span class="o">())</span> <span class="o">!=</span> 
<span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
@@ -276,7 +276,7 @@ has any error connecting or receiving, the receiver is 
restarted to make another
       <span class="n">restart</span><span class="o">(</span><span 
class="s">&quot;Error receiving data&quot;</span><span class="o">,</span> <span 
class="n">t</span><span class="o">);</span>
     <span class="o">}</span>
   <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
   </div>
 </div>
@@ -290,20 +290,20 @@ an input DStream using data received by the instance of 
custom receiver, as show
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">// Assuming ssc is the 
StreamingContext</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="c1">// Assuming ssc is the 
StreamingContext</span>
 <span class="k">val</span> <span class="n">customReceiverStream</span> <span 
class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span 
class="n">receiverStream</span><span class="o">(</span><span 
class="k">new</span> <span class="nc">CustomReceiver</span><span 
class="o">(</span><span class="n">host</span><span class="o">,</span> <span 
class="n">port</span><span class="o">))</span>
 <span class="k">val</span> <span class="n">words</span> <span 
class="k">=</span> <span class="n">lines</span><span class="o">.</span><span 
class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span 
class="o">.</span><span class="n">split</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">))</span>
-<span class="o">...</span></code></pre></div>
+<span class="o">...</span></code></pre></figure>
 
     <p>The full source code is in the example <a 
href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala";>CustomReceiver.scala</a>.</p>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Assuming ssc is the 
JavaStreamingContext</span>
-<span class="n">JavaDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">customReceiverStream</span> <span class="o">=</span> <span 
class="n">ssc</span><span class="o">.</span><span 
class="na">receiverStream</span><span class="o">(</span><span 
class="k">new</span> <span class="nf">JavaCustomReceiver</span><span 
class="o">(</span><span class="n">host</span><span class="o">,</span> <span 
class="n">port</span><span class="o">));</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Assuming ssc is the 
JavaStreamingContext</span>
+<span class="n">JavaDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">customReceiverStream</span> <span class="o">=</span> <span 
class="n">ssc</span><span class="o">.</span><span 
class="na">receiverStream</span><span class="o">(</span><span 
class="k">new</span> <span class="n">JavaCustomReceiver</span><span 
class="o">(</span><span class="n">host</span><span class="o">,</span> <span 
class="n">port</span><span class="o">));</span>
 <span class="n">JavaDStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="n">lines</span><span 
class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span 
class="k">new</span> <span class="n">FlatMapFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span> <span class="o">...</span> <span class="o">});</span>
-<span class="o">...</span></code></pre></div>
+<span class="o">...</span></code></pre></figure>
 
     <p>The full source code is in the example <a 
href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java";>JavaCustomReceiver.java</a>.</p>
 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-kafka-0-10-integration.html
----------------------------------------------------------------------
diff --git a/site/docs/2.1.0/streaming-kafka-0-10-integration.html 
b/site/docs/2.1.0/streaming-kafka-0-10-integration.html
index a4f39fb..1e7fbba 100644
--- a/site/docs/2.1.0/streaming-kafka-0-10-integration.html
+++ b/site/docs/2.1.0/streaming-kafka-0-10-integration.html
@@ -142,7 +142,7 @@ version = 2.1.0
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">import</span> <span 
class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">import</span> <span 
class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span>
 <span class="k">import</span> <span 
class="nn">org.apache.kafka.common.serialization.StringDeserializer</span>
 <span class="k">import</span> <span 
class="nn">org.apache.spark.streaming.kafka010._</span>
 <span class="k">import</span> <span 
class="nn">org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent</span>
@@ -164,13 +164,13 @@ version = 2.1.0
   <span class="nc">Subscribe</span><span class="o">[</span><span 
class="kt">String</span>, <span class="kt">String</span><span 
class="o">](</span><span class="n">topics</span><span class="o">,</span> <span 
class="n">kafkaParams</span><span class="o">)</span>
 <span class="o">)</span>
 
-<span class="n">stream</span><span class="o">.</span><span 
class="n">map</span><span class="o">(</span><span class="n">record</span> <span 
class="k">=&gt;</span> <span class="o">(</span><span 
class="n">record</span><span class="o">.</span><span class="n">key</span><span 
class="o">,</span> <span class="n">record</span><span class="o">.</span><span 
class="n">value</span><span class="o">))</span></code></pre></div>
+<span class="n">stream</span><span class="o">.</span><span 
class="n">map</span><span class="o">(</span><span class="n">record</span> <span 
class="k">=&gt;</span> <span class="o">(</span><span 
class="n">record</span><span class="o">.</span><span class="n">key</span><span 
class="o">,</span> <span class="n">record</span><span class="o">.</span><span 
class="n">value</span><span class="o">))</span></code></pre></figure>
 
     <p>Each item in the stream is a <a 
href="http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html";>ConsumerRecord</a></p>
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="kn">import</span> <span 
class="nn">java.util.*</span><span class="o">;</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="kn">import</span> <span 
class="nn">java.util.*</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.SparkConf</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.TaskContext</span><span class="o">;</span>
 <span class="kn">import</span> <span 
class="nn">org.apache.spark.api.java.*</span><span class="o">;</span>
@@ -205,7 +205,7 @@ version = 2.1.0
     <span class="kd">public</span> <span class="n">Tuple2</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="nf">call</span><span class="o">(</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">record</span><span class="o">)</span> <span class="o">{</span>
       <span class="k">return</span> <span class="k">new</span> <span 
class="n">Tuple2</span><span class="o">&lt;&gt;(</span><span 
class="n">record</span><span class="o">.</span><span class="na">key</span><span 
class="o">(),</span> <span class="n">record</span><span class="o">.</span><span 
class="na">value</span><span class="o">());</span>
     <span class="o">}</span>
-  <span class="o">})</span></code></pre></div>
+  <span class="o">})</span></code></pre></figure>
 
   </div>
 </div>
@@ -236,7 +236,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">// Import dependencies and create kafka 
params as in Create Direct Stream above</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="c1">// Import dependencies and 
create kafka params as in Create Direct Stream above</span>
 
 <span class="k">val</span> <span class="n">offsetRanges</span> <span 
class="k">=</span> <span class="nc">Array</span><span class="o">(</span>
   <span class="c1">// topic, partition, inclusive starting offset, exclusive 
ending offset</span>
@@ -244,12 +244,12 @@ Note that the example sets enable.auto.commit to false, 
for discussion see <a hr
   <span class="nc">OffsetRange</span><span class="o">(</span><span 
class="s">&quot;test&quot;</span><span class="o">,</span> <span 
class="mi">1</span><span class="o">,</span> <span class="mi">0</span><span 
class="o">,</span> <span class="mi">100</span><span class="o">)</span>
 <span class="o">)</span>
 
-<span class="k">val</span> <span class="n">rdd</span> <span class="k">=</span> 
<span class="nc">KafkaUtils</span><span class="o">.</span><span 
class="n">createRDD</span><span class="o">[</span><span 
class="kt">String</span>, <span class="kt">String</span><span 
class="o">](</span><span class="n">sparkContext</span><span class="o">,</span> 
<span class="n">kafkaParams</span><span class="o">,</span> <span 
class="n">offsetRanges</span><span class="o">,</span> <span 
class="nc">PreferConsistent</span><span class="o">)</span></code></pre></div>
+<span class="k">val</span> <span class="n">rdd</span> <span class="k">=</span> 
<span class="nc">KafkaUtils</span><span class="o">.</span><span 
class="n">createRDD</span><span class="o">[</span><span 
class="kt">String</span>, <span class="kt">String</span><span 
class="o">](</span><span class="n">sparkContext</span><span class="o">,</span> 
<span class="n">kafkaParams</span><span class="o">,</span> <span 
class="n">offsetRanges</span><span class="o">,</span> <span 
class="nc">PreferConsistent</span><span class="o">)</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// Import dependencies and create kafka 
params as in Create Direct Stream above</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// Import dependencies and 
create kafka params as in Create Direct Stream above</span>
 
 <span class="n">OffsetRange</span><span class="o">[]</span> <span 
class="n">offsetRanges</span> <span class="o">=</span> <span class="o">{</span>
   <span class="c1">// topic, partition, inclusive starting offset, exclusive 
ending offset</span>
@@ -262,7 +262,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
   <span class="n">kafkaParams</span><span class="o">,</span>
   <span class="n">offsetRanges</span><span class="o">,</span>
   <span class="n">LocationStrategies</span><span class="o">.</span><span 
class="na">PreferConsistent</span><span class="o">()</span>
-<span class="o">);</span></code></pre></div>
+<span class="o">);</span></code></pre></figure>
 
   </div>
 </div>
@@ -274,18 +274,18 @@ Note that the example sets enable.auto.commit to false, 
for discussion see <a hr
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">stream</span><span class="o">.</span><span 
class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> 
<span class="k">=&gt;</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="n">stream</span><span 
class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> 
<span class="n">rdd</span> <span class="k">=&gt;</span>
   <span class="k">val</span> <span class="n">offsetRanges</span> <span 
class="k">=</span> <span class="n">rdd</span><span class="o">.</span><span 
class="n">asInstanceOf</span><span class="o">[</span><span 
class="kt">HasOffsetRanges</span><span class="o">].</span><span 
class="n">offsetRanges</span>
   <span class="n">rdd</span><span class="o">.</span><span 
class="n">foreachPartition</span> <span class="o">{</span> <span 
class="n">iter</span> <span class="k">=&gt;</span>
     <span class="k">val</span> <span class="n">o</span><span 
class="k">:</span> <span class="kt">OffsetRange</span> <span class="o">=</span> 
<span class="n">offsetRanges</span><span class="o">(</span><span 
class="nc">TaskContext</span><span class="o">.</span><span 
class="n">get</span><span class="o">.</span><span 
class="n">partitionId</span><span class="o">)</span>
-    <span class="n">println</span><span class="o">(</span><span 
class="n">s</span><span class="s">&quot;${o.topic} ${o.partition} 
${o.fromOffset} ${o.untilOffset}&quot;</span><span class="o">)</span>
+    <span class="n">println</span><span class="o">(</span><span 
class="s">s&quot;</span><span class="si">${</span><span class="n">o</span><span 
class="o">.</span><span class="n">topic</span><span class="si">}</span><span 
class="s"> </span><span class="si">${</span><span class="n">o</span><span 
class="o">.</span><span class="n">partition</span><span 
class="si">}</span><span class="s"> </span><span class="si">${</span><span 
class="n">o</span><span class="o">.</span><span 
class="n">fromOffset</span><span class="si">}</span><span class="s"> 
</span><span class="si">${</span><span class="n">o</span><span 
class="o">.</span><span class="n">untilOffset</span><span 
class="si">}</span><span class="s">&quot;</span><span class="o">)</span>
   <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">stream</span><span class="o">.</span><span 
class="na">foreachRDD</span><span class="o">(</span><span class="k">new</span> 
<span class="n">VoidFunction</span><span class="o">&lt;</span><span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;&gt;()</span> <span 
class="o">{</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">stream</span><span 
class="o">.</span><span class="na">foreachRDD</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">VoidFunction</span><span class="o">&lt;</span><span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;&gt;()</span> <span 
class="o">{</span>
   <span class="nd">@Override</span>
   <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">call</span><span class="o">(</span><span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">rdd</span><span class="o">)</span> <span class="o">{</span>
     <span class="kd">final</span> <span class="n">OffsetRange</span><span 
class="o">[]</span> <span class="n">offsetRanges</span> <span 
class="o">=</span> <span class="o">((</span><span 
class="n">HasOffsetRanges</span><span class="o">)</span> <span 
class="n">rdd</span><span class="o">.</span><span class="na">rdd</span><span 
class="o">()).</span><span class="na">offsetRanges</span><span 
class="o">();</span>
@@ -298,7 +298,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
       <span class="o">}</span>
     <span class="o">});</span>
   <span class="o">}</span>
-<span class="o">});</span></code></pre></div>
+<span class="o">});</span></code></pre></figure>
 
   </div>
 </div>
@@ -317,18 +317,18 @@ Note that the example sets enable.auto.commit to false, 
for discussion see <a hr
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="n">stream</span><span class="o">.</span><span 
class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> 
<span class="k">=&gt;</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="n">stream</span><span 
class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> 
<span class="n">rdd</span> <span class="k">=&gt;</span>
   <span class="k">val</span> <span class="n">offsetRanges</span> <span 
class="k">=</span> <span class="n">rdd</span><span class="o">.</span><span 
class="n">asInstanceOf</span><span class="o">[</span><span 
class="kt">HasOffsetRanges</span><span class="o">].</span><span 
class="n">offsetRanges</span>
 
   <span class="c1">// some time later, after outputs have completed</span>
   <span class="n">stream</span><span class="o">.</span><span 
class="n">asInstanceOf</span><span class="o">[</span><span 
class="kt">CanCommitOffsets</span><span class="o">].</span><span 
class="n">commitAsync</span><span class="o">(</span><span 
class="n">offsetRanges</span><span class="o">)</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
     <p>As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed 
if called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.</p>
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">stream</span><span class="o">.</span><span 
class="na">foreachRDD</span><span class="o">(</span><span class="k">new</span> 
<span class="n">VoidFunction</span><span class="o">&lt;</span><span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;&gt;()</span> <span 
class="o">{</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">stream</span><span 
class="o">.</span><span class="na">foreachRDD</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">VoidFunction</span><span class="o">&lt;</span><span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;&gt;()</span> <span 
class="o">{</span>
   <span class="nd">@Override</span>
   <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">call</span><span class="o">(</span><span 
class="n">JavaRDD</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">rdd</span><span class="o">)</span> <span class="o">{</span>
     <span class="n">OffsetRange</span><span class="o">[]</span> <span 
class="n">offsetRanges</span> <span class="o">=</span> <span 
class="o">((</span><span class="n">HasOffsetRanges</span><span 
class="o">)</span> <span class="n">rdd</span><span class="o">.</span><span 
class="na">rdd</span><span class="o">()).</span><span 
class="na">offsetRanges</span><span class="o">();</span>
@@ -336,7 +336,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
     <span class="c1">// some time later, after outputs have completed</span>
     <span class="o">((</span><span class="n">CanCommitOffsets</span><span 
class="o">)</span> <span class="n">stream</span><span class="o">.</span><span 
class="na">inputDStream</span><span class="o">()).</span><span 
class="na">commitAsync</span><span class="o">(</span><span 
class="n">offsetRanges</span><span class="o">);</span>
   <span class="o">}</span>
-<span class="o">});</span></code></pre></div>
+<span class="o">});</span></code></pre></figure>
 
   </div>
 </div>
@@ -347,7 +347,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">// The details depend on your data store, 
but the general idea looks like this</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="c1">// The details depend on your 
data store, but the general idea looks like this</span>
 
 <span class="c1">// begin from the the offsets committed to the database</span>
 <span class="k">val</span> <span class="n">fromOffsets</span> <span 
class="k">=</span> <span class="n">selectOffsetsFromYourDatabase</span><span 
class="o">.</span><span class="n">map</span> <span class="o">{</span> <span 
class="n">resultSet</span> <span class="k">=&gt;</span>
@@ -372,17 +372,17 @@ Note that the example sets enable.auto.commit to false, 
for discussion see <a hr
   <span class="c1">// assert that offsets were updated correctly</span>
 
   <span class="c1">// end your transaction</span>
-<span class="o">}</span></code></pre></div>
+<span class="o">}</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="c1">// The details depend on your data store, but 
the general idea looks like this</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="c1">// The details depend on your 
data store, but the general idea looks like this</span>
 
 <span class="c1">// begin from the the offsets committed to the database</span>
 <span class="n">Map</span><span class="o">&lt;</span><span 
class="n">TopicPartition</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span 
class="n">fromOffsets</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">HashMap</span><span 
class="o">&lt;&gt;();</span>
 <span class="k">for</span> <span class="o">(</span><span 
class="n">resultSet</span> <span class="o">:</span> <span 
class="n">selectOffsetsFromYourDatabase</span><span class="o">)</span>
-  <span class="n">fromOffsets</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span class="k">new</span> <span 
class="nf">TopicPartition</span><span class="o">(</span><span 
class="n">resultSet</span><span class="o">.</span><span 
class="na">string</span><span class="o">(</span><span 
class="s">&quot;topic&quot;</span><span class="o">),</span> <span 
class="n">resultSet</span><span class="o">.</span><span 
class="na">int</span><span class="o">(</span><span 
class="s">&quot;partition&quot;</span><span class="o">)),</span> <span 
class="n">resultSet</span><span class="o">.</span><span 
class="na">long</span><span class="o">(</span><span 
class="s">&quot;offset&quot;</span><span class="o">));</span>
+  <span class="n">fromOffsets</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span class="k">new</span> <span 
class="n">TopicPartition</span><span class="o">(</span><span 
class="n">resultSet</span><span class="o">.</span><span 
class="na">string</span><span class="o">(</span><span 
class="s">&quot;topic&quot;</span><span class="o">),</span> <span 
class="n">resultSet</span><span class="o">.</span><span 
class="na">int</span><span class="o">(</span><span 
class="s">&quot;partition&quot;</span><span class="o">)),</span> <span 
class="n">resultSet</span><span class="o">.</span><span 
class="na">long</span><span class="o">(</span><span 
class="s">&quot;offset&quot;</span><span class="o">));</span>
 <span class="o">}</span>
 
 <span class="n">JavaInputDStream</span><span class="o">&lt;</span><span 
class="n">ConsumerRecord</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span 
class="n">KafkaUtils</span><span class="o">.</span><span 
class="na">createDirectStream</span><span class="o">(</span>
@@ -406,7 +406,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
 
     <span class="c1">// end your transaction</span>
   <span class="o">}</span>
-<span class="o">});</span></code></pre></div>
+<span class="o">});</span></code></pre></figure>
 
   </div>
 </div>
@@ -417,7 +417,7 @@ Note that the example sets enable.auto.commit to false, for 
discussion see <a hr
 <div class="codetabs">
 <div data-lang="scala">
 
-    <div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span class="n">kafkaParams</span> 
<span class="k">=</span> <span class="nc">Map</span><span 
class="o">[</span><span class="kt">String</span>, <span 
class="kt">Object</span><span class="o">](</span>
+    <figure class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span></span><span class="k">val</span> <span 
class="n">kafkaParams</span> <span class="k">=</span> <span 
class="nc">Map</span><span class="o">[</span><span class="kt">String</span>, 
<span class="kt">Object</span><span class="o">](</span>
   <span class="c1">// the usual params, make sure to change the port in 
bootstrap.servers if 9092 is not TLS</span>
   <span class="s">&quot;security.protocol&quot;</span> <span 
class="o">-&gt;</span> <span class="s">&quot;SSL&quot;</span><span 
class="o">,</span>
   <span class="s">&quot;ssl.truststore.location&quot;</span> <span 
class="o">-&gt;</span> <span 
class="s">&quot;/some-directory/kafka.client.truststore.jks&quot;</span><span 
class="o">,</span>
@@ -425,19 +425,19 @@ Note that the example sets enable.auto.commit to false, 
for discussion see <a hr
   <span class="s">&quot;ssl.keystore.location&quot;</span> <span 
class="o">-&gt;</span> <span 
class="s">&quot;/some-directory/kafka.client.keystore.jks&quot;</span><span 
class="o">,</span>
   <span class="s">&quot;ssl.keystore.password&quot;</span> <span 
class="o">-&gt;</span> <span class="s">&quot;test1234&quot;</span><span 
class="o">,</span>
   <span class="s">&quot;ssl.key.password&quot;</span> <span 
class="o">-&gt;</span> <span class="s">&quot;test1234&quot;</span>
-<span class="o">)</span></code></pre></div>
+<span class="o">)</span></code></pre></figure>
 
   </div>
 <div data-lang="java">
 
-    <div class="highlight"><pre><code class="language-java" 
data-lang="java"><span class="n">Map</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Object</span><span class="o">&gt;</span> <span 
class="n">kafkaParams</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">HashMap</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Object</span><span class="o">&gt;();</span>
+    <figure class="highlight"><pre><code class="language-java" 
data-lang="java"><span></span><span class="n">Map</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Object</span><span class="o">&gt;</span> <span 
class="n">kafkaParams</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">HashMap</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Object</span><span class="o">&gt;();</span>
 <span class="c1">// the usual params, make sure to change the port in 
bootstrap.servers if 9092 is not TLS</span>
 <span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;security.protocol&quot;</span><span class="o">,</span> <span 
class="s">&quot;SSL&quot;</span><span class="o">);</span>
 <span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;ssl.truststore.location&quot;</span><span class="o">,</span> 
<span 
class="s">&quot;/some-directory/kafka.client.truststore.jks&quot;</span><span 
class="o">);</span>
 <span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;ssl.truststore.password&quot;</span><span class="o">,</span> 
<span class="s">&quot;test1234&quot;</span><span class="o">);</span>
 <span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;ssl.keystore.location&quot;</span><span class="o">,</span> 
<span 
class="s">&quot;/some-directory/kafka.client.keystore.jks&quot;</span><span 
class="o">);</span>
 <span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;ssl.keystore.password&quot;</span><span class="o">,</span> 
<span class="s">&quot;test1234&quot;</span><span class="o">);</span>
-<span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;ssl.key.password&quot;</span><span class="o">,</span> <span 
class="s">&quot;test1234&quot;</span><span 
class="o">);</span></code></pre></div>
+<span class="n">kafkaParams</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;ssl.key.password&quot;</span><span class="o">,</span> <span 
class="s">&quot;test1234&quot;</span><span 
class="o">);</span></code></pre></figure>
 
   </div>
 </div>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to