http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/storage-openstack-swift.html ---------------------------------------------------------------------- diff --git a/site/docs/2.1.0/storage-openstack-swift.html b/site/docs/2.1.0/storage-openstack-swift.html index bbb3446..a20c67f 100644 --- a/site/docs/2.1.0/storage-openstack-swift.html +++ b/site/docs/2.1.0/storage-openstack-swift.html @@ -144,7 +144,7 @@ Current Swift driver requires Swift to use Keystone authentication method.</p> <p>The Spark application should include <code>hadoop-openstack</code> dependency. For example, for Maven support, add the following to the <code>pom.xml</code> file:</p> -<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><dependencyManagement></span> +<figure class="highlight"><pre><code class="language-xml" data-lang="xml"><span></span><span class="nt"><dependencyManagement></span> ... <span class="nt"><dependency></span> <span class="nt"><groupId></span>org.apache.hadoop<span class="nt"></groupId></span> @@ -152,15 +152,15 @@ For example, for Maven support, add the following to the <code>pom.xml</code> fi <span class="nt"><version></span>2.3.0<span class="nt"></version></span> <span class="nt"></dependency></span> ... -<span class="nt"></dependencyManagement></span></code></pre></div> +<span class="nt"></dependencyManagement></span></code></pre></figure> <h1 id="configuration-parameters">Configuration Parameters</h1> <p>Create <code>core-site.xml</code> and place it inside Spark’s <code>conf</code> directory. There are two main categories of parameters that should to be configured: declaration of the -Swift driver and the parameters that are required by Keystone.</p> +Swift driver and the parameters that are required by Keystone. </p> -<p>Configuration of Hadoop to use Swift File system achieved via</p> +<p>Configuration of Hadoop to use Swift File system achieved via </p> <table class="table"> <tr><th>Property Name</th><th>Value</th></tr> @@ -221,7 +221,7 @@ contains a list of Keystone mandatory parameters. <code>PROVIDER</code> can be a <p>For example, assume <code>PROVIDER=SparkTest</code> and Keystone contains user <code>tester</code> with password <code>testing</code> defined for tenant <code>test</code>. Then <code>core-site.xml</code> should include:</p> -<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><configuration></span> +<figure class="highlight"><pre><code class="language-xml" data-lang="xml"><span></span><span class="nt"><configuration></span> <span class="nt"><property></span> <span class="nt"><name></span>fs.swift.impl<span class="nt"></name></span> <span class="nt"><value></span>org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem<span class="nt"></value></span> @@ -257,7 +257,7 @@ defined for tenant <code>test</code>. Then <code>core-site.xml</code> should inc <span class="nt"><name></span>fs.swift.service.SparkTest.password<span class="nt"></name></span> <span class="nt"><value></span>testing<span class="nt"></value></span> <span class="nt"></property></span> -<span class="nt"></configuration></span></code></pre></div> +<span class="nt"></configuration></span></code></pre></figure> <p>Notice that <code>fs.swift.service.PROVIDER.tenant</code>,
http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-custom-receivers.html ---------------------------------------------------------------------- diff --git a/site/docs/2.1.0/streaming-custom-receivers.html b/site/docs/2.1.0/streaming-custom-receivers.html index d31647d..846c797 100644 --- a/site/docs/2.1.0/streaming-custom-receivers.html +++ b/site/docs/2.1.0/streaming-custom-receivers.html @@ -171,7 +171,7 @@ has any error connecting or receiving, the receiver is restarted to make another <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">class</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="k">extends</span> <span class="nc">Receiver</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="nc">StorageLevel</span><span class="o">.</span><span class="nc">MEMORY_AND_DISK_2</span><span class="o">)</span> <span class="k">with</span> <span class="nc">Logging</span> <span class="o">{</span> <span class="k">def</span> <span class="n">onStart</span><span class="o">()</span> <span class="o">{</span> @@ -216,12 +216,12 @@ has any error connecting or receiving, the receiver is restarted to make another <span class="n">restart</span><span class="o">(</span><span class="s">"Error receiving data"</span><span class="o">,</span> <span class="n">t</span><span class="o">)</span> <span class="o">}</span> <span class="o">}</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">JavaCustomReceiver</span> <span class="kd">extends</span> <span class="n">Receiver</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="o">{</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">JavaCustomReceiver</span> <span class="kd">extends</span> <span class="n">Receiver</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="o">{</span> <span class="n">String</span> <span class="n">host</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> <span class="kt">int</span> <span class="n">port</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span> @@ -234,7 +234,7 @@ has any error connecting or receiving, the receiver is restarted to make another <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStart</span><span class="o">()</span> <span class="o">{</span> <span class="c1">// Start the thread that receives data over a connection</span> - <span class="k">new</span> <span class="nf">Thread</span><span class="o">()</span> <span class="o">{</span> + <span class="k">new</span> <span class="n">Thread</span><span class="o">()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">()</span> <span class="o">{</span> <span class="n">receive</span><span class="o">();</span> <span class="o">}</span> @@ -253,10 +253,10 @@ has any error connecting or receiving, the receiver is restarted to make another <span class="k">try</span> <span class="o">{</span> <span class="c1">// connect to the server</span> - <span class="n">socket</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Socket</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span> + <span class="n">socket</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Socket</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span> - <span class="n">BufferedReader</span> <span class="n">reader</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">BufferedReader</span><span class="o">(</span> - <span class="k">new</span> <span class="nf">InputStreamReader</span><span class="o">(</span><span class="n">socket</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(),</span> <span class="n">StandardCharsets</span><span class="o">.</span><span class="na">UTF_8</span><span class="o">));</span> + <span class="n">BufferedReader</span> <span class="n">reader</span> <span class="o">=</span> <span class="k">new</span> <span class="n">BufferedReader</span><span class="o">(</span> + <span class="k">new</span> <span class="n">InputStreamReader</span><span class="o">(</span><span class="n">socket</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(),</span> <span class="n">StandardCharsets</span><span class="o">.</span><span class="na">UTF_8</span><span class="o">));</span> <span class="c1">// Until stopped or connection broken continue reading</span> <span class="k">while</span> <span class="o">(!</span><span class="n">isStopped</span><span class="o">()</span> <span class="o">&&</span> <span class="o">(</span><span class="n">userInput</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="na">readLine</span><span class="o">())</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> @@ -276,7 +276,7 @@ has any error connecting or receiving, the receiver is restarted to make another <span class="n">restart</span><span class="o">(</span><span class="s">"Error receiving data"</span><span class="o">,</span> <span class="n">t</span><span class="o">);</span> <span class="o">}</span> <span class="o">}</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> </div> </div> @@ -290,20 +290,20 @@ an input DStream using data received by the instance of custom receiver, as show <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Assuming ssc is the StreamingContext</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Assuming ssc is the StreamingContext</span> <span class="k">val</span> <span class="n">customReceiverStream</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">))</span> <span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> -<span class="o">...</span></code></pre></div> +<span class="o">...</span></code></pre></figure> <p>The full source code is in the example <a href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala">CustomReceiver.scala</a>.</p> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Assuming ssc is the JavaStreamingContext</span> -<span class="n">JavaDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">customReceiverStream</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="na">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nf">JavaCustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">));</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Assuming ssc is the JavaStreamingContext</span> +<span class="n">JavaDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">customReceiverStream</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="na">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="n">JavaCustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">));</span> <span class="n">JavaDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">});</span> -<span class="o">...</span></code></pre></div> +<span class="o">...</span></code></pre></figure> <p>The full source code is in the example <a href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java">JavaCustomReceiver.java</a>.</p> http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-kafka-0-10-integration.html ---------------------------------------------------------------------- diff --git a/site/docs/2.1.0/streaming-kafka-0-10-integration.html b/site/docs/2.1.0/streaming-kafka-0-10-integration.html index a4f39fb..1e7fbba 100644 --- a/site/docs/2.1.0/streaming-kafka-0-10-integration.html +++ b/site/docs/2.1.0/streaming-kafka-0-10-integration.html @@ -142,7 +142,7 @@ version = 2.1.0 <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">import</span> <span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span> <span class="k">import</span> <span class="nn">org.apache.kafka.common.serialization.StringDeserializer</span> <span class="k">import</span> <span class="nn">org.apache.spark.streaming.kafka010._</span> <span class="k">import</span> <span class="nn">org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent</span> @@ -164,13 +164,13 @@ version = 2.1.0 <span class="nc">Subscribe</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span><span class="n">topics</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">)</span> <span class="o">)</span> -<span class="n">stream</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">record</span> <span class="k">=></span> <span class="o">(</span><span class="n">record</span><span class="o">.</span><span class="n">key</span><span class="o">,</span> <span class="n">record</span><span class="o">.</span><span class="n">value</span><span class="o">))</span></code></pre></div> +<span class="n">stream</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">record</span> <span class="k">=></span> <span class="o">(</span><span class="n">record</span><span class="o">.</span><span class="n">key</span><span class="o">,</span> <span class="n">record</span><span class="o">.</span><span class="n">value</span><span class="o">))</span></code></pre></figure> <p>Each item in the stream is a <a href="http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a></p> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">java.util.*</span><span class="o">;</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">import</span> <span class="nn">java.util.*</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.SparkConf</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.TaskContext</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.*</span><span class="o">;</span> @@ -205,7 +205,7 @@ version = 2.1.0 <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="nf">call</span><span class="o">(</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">record</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o"><>(</span><span class="n">record</span><span class="o">.</span><span class="na">key</span><span class="o">(),</span> <span class="n">record</span><span class="o">.</span><span class="na">value</span><span class="o">());</span> <span class="o">}</span> - <span class="o">})</span></code></pre></div> + <span class="o">})</span></code></pre></figure> </div> </div> @@ -236,7 +236,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Import dependencies and create kafka params as in Create Direct Stream above</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Import dependencies and create kafka params as in Create Direct Stream above</span> <span class="k">val</span> <span class="n">offsetRanges</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span> <span class="c1">// topic, partition, inclusive starting offset, exclusive ending offset</span> @@ -244,12 +244,12 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="nc">OffsetRange</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">100</span><span class="o">)</span> <span class="o">)</span> -<span class="k">val</span> <span class="n">rdd</span> <span class="k">=</span> <span class="nc">KafkaUtils</span><span class="o">.</span><span class="n">createRDD</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span><span class="n">sparkContext</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="n">offsetRanges</span><span class="o">,</span> <span class="nc">PreferConsistent</span><span class="o">)</span></code></pre></div> +<span class="k">val</span> <span class="n">rdd</span> <span class="k">=</span> <span class="nc">KafkaUtils</span><span class="o">.</span><span class="n">createRDD</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span><span class="n">sparkContext</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="n">offsetRanges</span><span class="o">,</span> <span class="nc">PreferConsistent</span><span class="o">)</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Import dependencies and create kafka params as in Create Direct Stream above</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Import dependencies and create kafka params as in Create Direct Stream above</span> <span class="n">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">{</span> <span class="c1">// topic, partition, inclusive starting offset, exclusive ending offset</span> @@ -262,7 +262,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="n">kafkaParams</span><span class="o">,</span> <span class="n">offsetRanges</span><span class="o">,</span> <span class="n">LocationStrategies</span><span class="o">.</span><span class="na">PreferConsistent</span><span class="o">()</span> -<span class="o">);</span></code></pre></div> +<span class="o">);</span></code></pre></figure> </div> </div> @@ -274,18 +274,18 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span><span class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="n">stream</span><span class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> <span class="k">val</span> <span class="n">offsetRanges</span> <span class="k">=</span> <span class="n">rdd</span><span class="o">.</span><span class="n">asInstanceOf</span><span class="o">[</span><span class="kt">HasOffsetRanges</span><span class="o">].</span><span class="n">offsetRanges</span> <span class="n">rdd</span><span class="o">.</span><span class="n">foreachPartition</span> <span class="o">{</span> <span class="n">iter</span> <span class="k">=></span> <span class="k">val</span> <span class="n">o</span><span class="k">:</span> <span class="kt">OffsetRange</span> <span class="o">=</span> <span class="n">offsetRanges</span><span class="o">(</span><span class="nc">TaskContext</span><span class="o">.</span><span class="n">get</span><span class="o">.</span><span class="n">partitionId</span><span class="o">)</span> - <span class="n">println</span><span class="o">(</span><span class="n">s</span><span class="s">"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}"</span><span class="o">)</span> + <span class="n">println</span><span class="o">(</span><span class="s">s"</span><span class="si">${</span><span class="n">o</span><span class="o">.</span><span class="n">topic</span><span class="si">}</span><span class="s"> </span><span class="si">${</span><span class="n">o</span><span class="o">.</span><span class="n">partition</span><span class="si">}</span><span class="s"> </span><span class="si">${</span><span class="n">o</span><span class="o">.</span><span class="n">fromOffset</span><span class="si">}</span><span class="s"> </span><span class="si">${</span><span class="n">o</span><span class="o">.</span><span class="n">untilOffset</span><span class="si">}</span><span class="s">"</span><span class="o">)</span> <span class="o">}</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="k">new</span> <span class="n">VoidFunction</span><span class="o"><</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>>()</span> <span class="o">{</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="k">new</span> <span class="n">VoidFunction</span><span class="o"><</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>>()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">call</span><span class="o">(</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">rdd</span><span class="o">)</span> <span class="o">{</span> <span class="kd">final</span> <span class="n">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">((</span><span class="n">HasOffsetRanges</span><span class="o">)</span> <span class="n">rdd</span><span class="o">.</span><span class="na">rdd</span><span class="o">()).</span><span class="na">offsetRanges</span><span class="o">();</span> @@ -298,7 +298,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="o">}</span> <span class="o">});</span> <span class="o">}</span> -<span class="o">});</span></code></pre></div> +<span class="o">});</span></code></pre></figure> </div> </div> @@ -317,18 +317,18 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">stream</span><span class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="n">stream</span><span class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> <span class="k">val</span> <span class="n">offsetRanges</span> <span class="k">=</span> <span class="n">rdd</span><span class="o">.</span><span class="n">asInstanceOf</span><span class="o">[</span><span class="kt">HasOffsetRanges</span><span class="o">].</span><span class="n">offsetRanges</span> <span class="c1">// some time later, after outputs have completed</span> <span class="n">stream</span><span class="o">.</span><span class="n">asInstanceOf</span><span class="o">[</span><span class="kt">CanCommitOffsets</span><span class="o">].</span><span class="n">commitAsync</span><span class="o">(</span><span class="n">offsetRanges</span><span class="o">)</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> <p>As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.</p> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="k">new</span> <span class="n">VoidFunction</span><span class="o"><</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>>()</span> <span class="o">{</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="k">new</span> <span class="n">VoidFunction</span><span class="o"><</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>>()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">call</span><span class="o">(</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">rdd</span><span class="o">)</span> <span class="o">{</span> <span class="n">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">((</span><span class="n">HasOffsetRanges</span><span class="o">)</span> <span class="n">rdd</span><span class="o">.</span><span class="na">rdd</span><span class="o">()).</span><span class="na">offsetRanges</span><span class="o">();</span> @@ -336,7 +336,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="c1">// some time later, after outputs have completed</span> <span class="o">((</span><span class="n">CanCommitOffsets</span><span class="o">)</span> <span class="n">stream</span><span class="o">.</span><span class="na">inputDStream</span><span class="o">()).</span><span class="na">commitAsync</span><span class="o">(</span><span class="n">offsetRanges</span><span class="o">);</span> <span class="o">}</span> -<span class="o">});</span></code></pre></div> +<span class="o">});</span></code></pre></figure> </div> </div> @@ -347,7 +347,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// The details depend on your data store, but the general idea looks like this</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// The details depend on your data store, but the general idea looks like this</span> <span class="c1">// begin from the the offsets committed to the database</span> <span class="k">val</span> <span class="n">fromOffsets</span> <span class="k">=</span> <span class="n">selectOffsetsFromYourDatabase</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">resultSet</span> <span class="k">=></span> @@ -372,17 +372,17 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="c1">// assert that offsets were updated correctly</span> <span class="c1">// end your transaction</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// The details depend on your data store, but the general idea looks like this</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// The details depend on your data store, but the general idea looks like this</span> <span class="c1">// begin from the the offsets committed to the database</span> <span class="n">Map</span><span class="o"><</span><span class="n">TopicPartition</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">fromOffsets</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><>();</span> <span class="k">for</span> <span class="o">(</span><span class="n">resultSet</span> <span class="o">:</span> <span class="n">selectOffsetsFromYourDatabase</span><span class="o">)</span> - <span class="n">fromOffsets</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="k">new</span> <span class="nf">TopicPartition</span><span class="o">(</span><span class="n">resultSet</span><span class="o">.</span><span class="na">string</span><span class="o">(</span><span class="s">"topic"</span><span class="o">),</span> <span class="n">resultSet</span><span class="o">.</span><span class="na">int</span><span class="o">(</span><span class="s">"partition"</span><span class="o">)),</span> <span class="n">resultSet</span><span class="o">.</span><span class="na">long</span><span class="o">(</span><span class="s">"offset"</span><span class="o">));</span> + <span class="n">fromOffsets</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="k">new</span> <span class="n">TopicPartition</span><span class="o">(</span><span class="n">resultSet</span><span class="o">.</span><span class="na">string</span><span class="o">(</span><span class="s">"topic"</span><span class="o">),</span> <span class="n">resultSet</span><span class="o">.</span><span class="na">int</span><span class="o">(</span><span class="s">"partition"</span><span class="o">)),</span> <span class="n">resultSet</span><span class="o">.</span><span class="na">long</span><span class="o">(</span><span class="s">"offset"</span><span class="o">));</span> <span class="o">}</span> <span class="n">JavaInputDStream</span><span class="o"><</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">stream</span> <span class="o">=</span> <span class="n">KafkaUtils</span><span class="o">.</span><span class="na">createDirectStream</span><span class="o">(</span> @@ -406,7 +406,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="c1">// end your transaction</span> <span class="o">}</span> -<span class="o">});</span></code></pre></div> +<span class="o">});</span></code></pre></figure> </div> </div> @@ -417,7 +417,7 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">kafkaParams</span> <span class="k">=</span> <span class="nc">Map</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Object</span><span class="o">](</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">val</span> <span class="n">kafkaParams</span> <span class="k">=</span> <span class="nc">Map</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Object</span><span class="o">](</span> <span class="c1">// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS</span> <span class="s">"security.protocol"</span> <span class="o">-></span> <span class="s">"SSL"</span><span class="o">,</span> <span class="s">"ssl.truststore.location"</span> <span class="o">-></span> <span class="s">"/some-directory/kafka.client.truststore.jks"</span><span class="o">,</span> @@ -425,19 +425,19 @@ Note that the example sets enable.auto.commit to false, for discussion see <a hr <span class="s">"ssl.keystore.location"</span> <span class="o">-></span> <span class="s">"/some-directory/kafka.client.keystore.jks"</span><span class="o">,</span> <span class="s">"ssl.keystore.password"</span> <span class="o">-></span> <span class="s">"test1234"</span><span class="o">,</span> <span class="s">"ssl.key.password"</span> <span class="o">-></span> <span class="s">"test1234"</span> -<span class="o">)</span></code></pre></div> +<span class="o">)</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">></span> <span class="n">kafkaParams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">>();</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">></span> <span class="n">kafkaParams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">>();</span> <span class="c1">// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS</span> <span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"security.protocol"</span><span class="o">,</span> <span class="s">"SSL"</span><span class="o">);</span> <span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.truststore.location"</span><span class="o">,</span> <span class="s">"/some-directory/kafka.client.truststore.jks"</span><span class="o">);</span> <span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.truststore.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span> <span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.keystore.location"</span><span class="o">,</span> <span class="s">"/some-directory/kafka.client.keystore.jks"</span><span class="o">);</span> <span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.keystore.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span> -<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.key.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span></code></pre></div> +<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.key.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span></code></pre></figure> </div> </div> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org