http://git-wip-us.apache.org/repos/asf/spark-website/blob/d2bcf185/site/docs/2.1.0/streaming-programming-guide.html ---------------------------------------------------------------------- diff --git a/site/docs/2.1.0/streaming-programming-guide.html b/site/docs/2.1.0/streaming-programming-guide.html index 9a87d23..b1ce1e1 100644 --- a/site/docs/2.1.0/streaming-programming-guide.html +++ b/site/docs/2.1.0/streaming-programming-guide.html @@ -129,32 +129,32 @@ <ul id="markdown-toc"> - <li><a href="#overview" id="markdown-toc-overview">Overview</a></li> - <li><a href="#a-quick-example" id="markdown-toc-a-quick-example">A Quick Example</a></li> - <li><a href="#basic-concepts" id="markdown-toc-basic-concepts">Basic Concepts</a> <ul> - <li><a href="#linking" id="markdown-toc-linking">Linking</a></li> - <li><a href="#initializing-streamingcontext" id="markdown-toc-initializing-streamingcontext">Initializing StreamingContext</a></li> - <li><a href="#discretized-streams-dstreams" id="markdown-toc-discretized-streams-dstreams">Discretized Streams (DStreams)</a></li> - <li><a href="#input-dstreams-and-receivers" id="markdown-toc-input-dstreams-and-receivers">Input DStreams and Receivers</a></li> - <li><a href="#transformations-on-dstreams" id="markdown-toc-transformations-on-dstreams">Transformations on DStreams</a></li> - <li><a href="#output-operations-on-dstreams" id="markdown-toc-output-operations-on-dstreams">Output Operations on DStreams</a></li> - <li><a href="#dataframe-and-sql-operations" id="markdown-toc-dataframe-and-sql-operations">DataFrame and SQL Operations</a></li> - <li><a href="#mllib-operations" id="markdown-toc-mllib-operations">MLlib Operations</a></li> - <li><a href="#caching--persistence" id="markdown-toc-caching--persistence">Caching / Persistence</a></li> - <li><a href="#checkpointing" id="markdown-toc-checkpointing">Checkpointing</a></li> - <li><a href="#accumulators-broadcast-variables-and-checkpoints" id="markdown-toc-accumulators-broadcast-variables-and-checkpoints">Accumulators, Broadcast Variables, and Checkpoints</a></li> - <li><a href="#deploying-applications" id="markdown-toc-deploying-applications">Deploying Applications</a></li> - <li><a href="#monitoring-applications" id="markdown-toc-monitoring-applications">Monitoring Applications</a></li> + <li><a href="#overview">Overview</a></li> + <li><a href="#a-quick-example">A Quick Example</a></li> + <li><a href="#basic-concepts">Basic Concepts</a> <ul> + <li><a href="#linking">Linking</a></li> + <li><a href="#initializing-streamingcontext">Initializing StreamingContext</a></li> + <li><a href="#discretized-streams-dstreams">Discretized Streams (DStreams)</a></li> + <li><a href="#input-dstreams-and-receivers">Input DStreams and Receivers</a></li> + <li><a href="#transformations-on-dstreams">Transformations on DStreams</a></li> + <li><a href="#output-operations-on-dstreams">Output Operations on DStreams</a></li> + <li><a href="#dataframe-and-sql-operations">DataFrame and SQL Operations</a></li> + <li><a href="#mllib-operations">MLlib Operations</a></li> + <li><a href="#caching--persistence">Caching / Persistence</a></li> + <li><a href="#checkpointing">Checkpointing</a></li> + <li><a href="#accumulators-broadcast-variables-and-checkpoints">Accumulators, Broadcast Variables, and Checkpoints</a></li> + <li><a href="#deploying-applications">Deploying Applications</a></li> + <li><a href="#monitoring-applications">Monitoring Applications</a></li> </ul> </li> - <li><a href="#performance-tuning" id="markdown-toc-performance-tuning">Performance Tuning</a> <ul> - <li><a href="#reducing-the-batch-processing-times" id="markdown-toc-reducing-the-batch-processing-times">Reducing the Batch Processing Times</a></li> - <li><a href="#setting-the-right-batch-interval" id="markdown-toc-setting-the-right-batch-interval">Setting the Right Batch Interval</a></li> - <li><a href="#memory-tuning" id="markdown-toc-memory-tuning">Memory Tuning</a></li> + <li><a href="#performance-tuning">Performance Tuning</a> <ul> + <li><a href="#reducing-the-batch-processing-times">Reducing the Batch Processing Times</a></li> + <li><a href="#setting-the-right-batch-interval">Setting the Right Batch Interval</a></li> + <li><a href="#memory-tuning">Memory Tuning</a></li> </ul> </li> - <li><a href="#fault-tolerance-semantics" id="markdown-toc-fault-tolerance-semantics">Fault-tolerance Semantics</a></li> - <li><a href="#where-to-go-from-here" id="markdown-toc-where-to-go-from-here">Where to Go from Here</a></li> + <li><a href="#fault-tolerance-semantics">Fault-tolerance Semantics</a></li> + <li><a href="#where-to-go-from-here">Where to Go from Here</a></li> </ul> <h1 id="overview">Overview</h1> @@ -209,7 +209,7 @@ conversions from StreamingContext into our environment in order to add useful me other classes we need (like DStream). <a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a> is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark._</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">import</span> <span class="nn">org.apache.spark._</span> <span class="k">import</span> <span class="nn">org.apache.spark.streaming._</span> <span class="k">import</span> <span class="nn">org.apache.spark.streaming.StreamingContext._</span> <span class="c1">// not necessary since Spark 1.3</span> @@ -217,33 +217,33 @@ main entry point for all streaming functionality. We create a local StreamingCon <span class="c1">// The master requires 2 cores to prevent from a starvation scenario.</span> <span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="n">setMaster</span><span class="o">(</span><span class="s">"local[2]"</span><span class="o">).</span><span class="n">setAppName</span><span class="o">(</span><span class="s">"NetworkWordCount"</span><span class="o">)</span> -<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span></code></pre></div> +<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span></code></pre></figure> <p>Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. <code>localhost</code>) and port (e.g. <code>9999</code>).</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Create a DStream that will connect to hostname:port, like localhost:9999</span> -<span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span></code></pre></div> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Create a DStream that will connect to hostname:port, like localhost:9999</span> +<span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span></code></pre></figure> <p>This <code>lines</code> DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space characters into words.</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Split each line into words</span> -<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span></code></pre></div> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Split each line into words</span> +<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span></code></pre></figure> <p><code>flatMap</code> is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the <code>words</code> DStream. Next, we want to count these words.</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.streaming.StreamingContext._</span> <span class="c1">// not necessary since Spark 1.3</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">import</span> <span class="nn">org.apache.spark.streaming.StreamingContext._</span> <span class="c1">// not necessary since Spark 1.3</span> <span class="c1">// Count each word in each batch</span> <span class="k">val</span> <span class="n">pairs</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">word</span> <span class="k">=></span> <span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> <span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKey</span><span class="o">(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">)</span> <span class="c1">// Print the first ten elements of each RDD generated in this DStream to the console</span> -<span class="n">wordCounts</span><span class="o">.</span><span class="n">print</span><span class="o">()</span></code></pre></div> +<span class="n">wordCounts</span><span class="o">.</span><span class="n">print</span><span class="o">()</span></code></pre></figure> <p>The <code>words</code> DStream is further mapped (one-to-one transformation) to a DStream of <code>(word, 1)</code> pairs, which is then reduced to get the frequency of words in each batch of data. @@ -253,8 +253,8 @@ Finally, <code>wordCounts.print()</code> will print a few of the counts generate will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">ssc</span><span class="o">.</span><span class="n">start</span><span class="o">()</span> <span class="c1">// Start the computation</span> -<span class="n">ssc</span><span class="o">.</span><span class="n">awaitTermination</span><span class="o">()</span> <span class="c1">// Wait for the computation to terminate</span></code></pre></div> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="n">ssc</span><span class="o">.</span><span class="n">start</span><span class="o">()</span> <span class="c1">// Start the computation</span> +<span class="n">ssc</span><span class="o">.</span><span class="n">awaitTermination</span><span class="o">()</span> <span class="c1">// Wait for the computation to terminate</span></code></pre></figure> <p>The complete code can be found in the Spark Streaming example <a href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala">NetworkWordCount</a>. @@ -268,33 +268,33 @@ after all the transformations have been setup, we finally call</p> which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.spark.*</span><span class="o">;</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.*</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.*</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.streaming.*</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">scala.Tuple2</span><span class="o">;</span> <span class="c1">// Create a local StreamingContext with two working thread and batch interval of 1 second</span> -<span class="n">SparkConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SparkConf</span><span class="o">().</span><span class="na">setMaster</span><span class="o">(</span><span class="s">"local[2]"</span><span class="o">).</span><span class="na">setAppName</span><span class="o">(</span><span class="s">"NetworkWordCount"</span><span class="o">);</span> -<span class="n">JavaStreamingContext</span> <span class="n">jssc</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JavaStreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span></code></pre></div> +<span class="n">SparkConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SparkConf</span><span class="o">().</span><span class="na">setMaster</span><span class="o">(</span><span class="s">"local[2]"</span><span class="o">).</span><span class="na">setAppName</span><span class="o">(</span><span class="s">"NetworkWordCount"</span><span class="o">);</span> +<span class="n">JavaStreamingContext</span> <span class="n">jssc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JavaStreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span></code></pre></figure> <p>Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. <code>localhost</code>) and port (e.g. <code>9999</code>).</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Create a DStream that will connect to hostname:port, like localhost:9999</span> -<span class="n">JavaReceiverInputDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">);</span></code></pre></div> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Create a DStream that will connect to hostname:port, like localhost:9999</span> +<span class="n">JavaReceiverInputDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">);</span></code></pre></figure> <p>This <code>lines</code> DStream represents the stream of data that will be received from the data server. Each record in this stream is a line of text. Then, we want to split the lines by space into words.</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Split each line into words</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Split each line into words</span> <span class="n">JavaDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span> <span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Iterator</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">x</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)).</span><span class="na">iterator</span><span class="o">();</span> <span class="o">}</span> - <span class="o">});</span></code></pre></div> + <span class="o">});</span></code></pre></figure> <p><code>flatMap</code> is a DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, @@ -306,7 +306,7 @@ that help define DStream transformations.</p> <p>Next, we want to count these words.</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Count each word in each batch</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Count each word in each batch</span> <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pairs</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span> <span class="k">new</span> <span class="n">PairFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> @@ -321,7 +321,7 @@ that help define DStream transformations.</p> <span class="o">});</span> <span class="c1">// Print the first ten elements of each RDD generated in this DStream to the console</span> -<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span></code></pre></div> +<span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span></code></pre></figure> <p>The <code>words</code> DStream is further mapped (one-to-one transformation) to a DStream of <code>(word, 1)</code> pairs, using a <a href="api/scala/index.html#org.apache.spark.api.java.function.PairFunction">PairFunction</a> @@ -333,8 +333,8 @@ Finally, <code>wordCounts.print()</code> will print a few of the counts generate will perform after it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call <code>start</code> method.</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">jssc</span><span class="o">.</span><span class="na">start</span><span class="o">();</span> <span class="c1">// Start the computation</span> -<span class="n">jssc</span><span class="o">.</span><span class="na">awaitTermination</span><span class="o">();</span> <span class="c1">// Wait for the computation to terminate</span></code></pre></div> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">jssc</span><span class="o">.</span><span class="na">start</span><span class="o">();</span> <span class="c1">// Start the computation</span> +<span class="n">jssc</span><span class="o">.</span><span class="na">awaitTermination</span><span class="o">();</span> <span class="c1">// Wait for the computation to terminate</span></code></pre></figure> <p>The complete code can be found in the Spark Streaming example <a href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java">JavaNetworkWordCount</a>. @@ -344,37 +344,37 @@ after all the transformations have been setup, we finally call <code>start</code <div data-lang="python"> <p>First, we import <a href="api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext">StreamingContext</a>, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span> <span class="kn">from</span> <span class="nn">pyspark.streaming</span> <span class="kn">import</span> <span class="n">StreamingContext</span> -<span class="c"># Create a local StreamingContext with two working thread and batch interval of 1 second</span> -<span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s">"local[2]"</span><span class="p">,</span> <span class="s">"NetworkWordCount"</span><span class="p">)</span> -<span class="n">ssc</span> <span class="o">=</span> <span class="n">StreamingContext</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span></code></pre></div> +<span class="c1"># Create a local StreamingContext with two working thread and batch interval of 1 second</span> +<span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="s2">"local[2]"</span><span class="p">,</span> <span class="s2">"NetworkWordCount"</span><span class="p">)</span> +<span class="n">ssc</span> <span class="o">=</span> <span class="n">StreamingContext</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span></code></pre></figure> <p>Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. <code>localhost</code>) and port (e.g. <code>9999</code>).</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># Create a DStream that will connect to hostname:port, like localhost:9999</span> -<span class="n">lines</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="p">(</span><span class="s">"localhost"</span><span class="p">,</span> <span class="mi">9999</span><span class="p">)</span></code></pre></div> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Create a DStream that will connect to hostname:port, like localhost:9999</span> +<span class="n">lines</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="p">(</span><span class="s2">"localhost"</span><span class="p">,</span> <span class="mi">9999</span><span class="p">)</span></code></pre></figure> <p>This <code>lines</code> DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space into words.</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># Split each line into words</span> -<span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">" "</span><span class="p">))</span></code></pre></div> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Split each line into words</span> +<span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="p">(</span><span class="k">lambda</span> <span class="n">line</span><span class="p">:</span> <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">" "</span><span class="p">))</span></code></pre></figure> <p><code>flatMap</code> is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the <code>words</code> DStream. Next, we want to count these words.</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># Count each word in each batch</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Count each word in each batch</span> <span class="n">pairs</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">word</span><span class="p">:</span> <span class="p">(</span><span class="n">word</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">:</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="p">)</span> -<span class="c"># Print the first ten elements of each RDD generated in this DStream to the console</span> -<span class="n">wordCounts</span><span class="o">.</span><span class="n">pprint</span><span class="p">()</span></code></pre></div> +<span class="c1"># Print the first ten elements of each RDD generated in this DStream to the console</span> +<span class="n">wordCounts</span><span class="o">.</span><span class="n">pprint</span><span class="p">()</span></code></pre></figure> <p>The <code>words</code> DStream is further mapped (one-to-one transformation) to a DStream of <code>(word, 1)</code> pairs, which is then reduced to get the frequency of words in each batch of data. @@ -384,8 +384,8 @@ Finally, <code>wordCounts.pprint()</code> will print a few of the counts generat will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">ssc</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> <span class="c"># Start the computation</span> -<span class="n">ssc</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">()</span> <span class="c"># Wait for the computation to terminate</span></code></pre></div> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="n">ssc</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> <span class="c1"># Start the computation</span> +<span class="n">ssc</span><span class="o">.</span><span class="n">awaitTermination</span><span class="p">()</span> <span class="c1"># Wait for the computation to terminate</span></code></pre></figure> <p>The complete code can be found in the Spark Streaming example <a href="https://github.com/apache/spark/blob/v2.1.0/examples/src/main/python/streaming/network_wordcount.py">NetworkWordCount</a>. @@ -398,24 +398,24 @@ after all the transformations have been setup, we finally call</p> you can run this example as follows. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using</p> -<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>nc -lk 9999</code></pre></div> +<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>$ nc -lk <span class="m">9999</span></code></pre></figure> <p>Then, in a different terminal, you can start the example by using</p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/run-example streaming.NetworkWordCount localhost 9999</code></pre></div> + <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>$ ./bin/run-example streaming.NetworkWordCount localhost <span class="m">9999</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/run-example streaming.JavaNetworkWordCount localhost 9999</code></pre></div> + <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>$ ./bin/run-example streaming.JavaNetworkWordCount localhost <span class="m">9999</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999</code></pre></div> + <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span>$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost <span class="m">9999</span></code></pre></figure> </div> </div> @@ -426,16 +426,16 @@ screen every second. It will look something like the following.</p> <table width="100%"> <td> -<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="c"># TERMINAL 1:</span> -<span class="c"># Running Netcat</span> +<figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="c1"># TERMINAL 1:</span> +<span class="c1"># Running Netcat</span> -<span class="nv">$ </span>nc -lk 9999 +$ nc -lk <span class="m">9999</span> hello world -...</code></pre></div> +...</code></pre></figure> </td> <td width="2%"></td> @@ -444,45 +444,45 @@ hello world <div data-lang="scala"> - <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="c"># TERMINAL 2: RUNNING NetworkWordCount</span> + <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="c1"># TERMINAL 2: RUNNING NetworkWordCount</span> -<span class="nv">$ </span>./bin/run-example streaming.NetworkWordCount localhost 9999 +$ ./bin/run-example streaming.NetworkWordCount localhost <span class="m">9999</span> ... ------------------------------------------- Time: <span class="m">1357008430000</span> ms ------------------------------------------- <span class="o">(</span>hello,1<span class="o">)</span> <span class="o">(</span>world,1<span class="o">)</span> -...</code></pre></div> +...</code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="c"># TERMINAL 2: RUNNING JavaNetworkWordCount</span> + <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="c1"># TERMINAL 2: RUNNING JavaNetworkWordCount</span> -<span class="nv">$ </span>./bin/run-example streaming.JavaNetworkWordCount localhost 9999 +$ ./bin/run-example streaming.JavaNetworkWordCount localhost <span class="m">9999</span> ... ------------------------------------------- Time: <span class="m">1357008430000</span> ms ------------------------------------------- <span class="o">(</span>hello,1<span class="o">)</span> <span class="o">(</span>world,1<span class="o">)</span> -...</code></pre></div> +...</code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="c"># TERMINAL 2: RUNNING network_wordcount.py</span> + <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span></span><span class="c1"># TERMINAL 2: RUNNING network_wordcount.py</span> -<span class="nv">$ </span>./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999 +$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost <span class="m">9999</span> ... ------------------------------------------- -Time: 2014-10-14 15:25:21 +Time: <span class="m">2014</span>-10-14 <span class="m">15</span>:25:21 ------------------------------------------- <span class="o">(</span>hello,1<span class="o">)</span> <span class="o">(</span>world,1<span class="o">)</span> -...</code></pre></div> +...</code></pre></figure> </div> </div> @@ -546,11 +546,11 @@ for the full list of supported sources and artifacts.</p> <p>A <a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a> object can be created from a <a href="api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a> object.</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark._</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">import</span> <span class="nn">org.apache.spark._</span> <span class="k">import</span> <span class="nn">org.apache.spark.streaming._</span> <span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="n">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="n">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">)</span> -<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span></code></pre></div> +<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span></code></pre></figure> <p>The <code>appName</code> parameter is a name for your application to show on the cluster UI. <code>master</code> is a <a href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster URL</a>, @@ -566,21 +566,21 @@ section for more details.</p> <p>A <code>StreamingContext</code> object can also be created from an existing <code>SparkContext</code> object.</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.streaming._</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">import</span> <span class="nn">org.apache.spark.streaming._</span> <span class="k">val</span> <span class="n">sc</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// existing SparkContext</span> -<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span></code></pre></div> +<span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span></code></pre></figure> </div> <div data-lang="java"> <p>A <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html">JavaStreamingContext</a> object can be created from a <a href="api/java/index.html?org/apache/spark/SparkConf.html">SparkConf</a> object.</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.spark.*</span><span class="o">;</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.*</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> -<span class="n">SparkConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SparkConf</span><span class="o">().</span><span class="na">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="na">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">);</span> -<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JavaStreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Duration</span><span class="o">(</span><span class="mi">1000</span><span class="o">));</span></code></pre></div> +<span class="n">SparkConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SparkConf</span><span class="o">().</span><span class="na">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="na">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">);</span> +<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JavaStreamingContext</span><span class="o">(</span><span class="n">conf</span><span class="o">,</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">1000</span><span class="o">));</span></code></pre></figure> <p>The <code>appName</code> parameter is a name for your application to show on the cluster UI. <code>master</code> is a <a href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster URL</a>, @@ -596,21 +596,21 @@ section for more details.</p> <p>A <code>JavaStreamingContext</code> object can also be created from an existing <code>JavaSparkContext</code>.</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> <span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">//existing JavaSparkContext</span> -<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JavaStreamingContext</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span></code></pre></div> +<span class="n">JavaStreamingContext</span> <span class="n">ssc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JavaStreamingContext</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span></code></pre></figure> </div> <div data-lang="python"> <p>A <a href="api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext">StreamingContext</a> object can be created from a <a href="api/python/pyspark.html#pyspark.SparkContext">SparkContext</a> object.</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span> <span class="kn">from</span> <span class="nn">pyspark.streaming</span> <span class="kn">import</span> <span class="n">StreamingContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="n">master</span><span class="p">,</span> <span class="n">appName</span><span class="p">)</span> -<span class="n">ssc</span> <span class="o">=</span> <span class="n">StreamingContext</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span></code></pre></div> +<span class="n">ssc</span> <span class="o">=</span> <span class="n">StreamingContext</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span></code></pre></figure> <p>The <code>appName</code> parameter is a name for your application to show on the cluster UI. <code>master</code> is a <a href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster URL</a>, @@ -931,15 +931,15 @@ define the update function as:</p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">updateFunction</span><span class="o">(</span><span class="n">newValues</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span class="o">],</span> <span class="n">runningCount</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Int</span><span class="o">])</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">def</span> <span class="n">updateFunction</span><span class="o">(</span><span class="n">newValues</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span class="o">],</span> <span class="n">runningCount</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Int</span><span class="o">])</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> <span class="k">val</span> <span class="n">newCount</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// add the new values with the previous running count to get the new count</span> <span class="nc">Some</span><span class="o">(</span><span class="n">newCount</span><span class="o">)</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> <p>This is applied on a DStream containing words (say, the <code>pairs</code> DStream containing <code>(word, 1)</code> pairs in the <a href="#a-quick-example">earlier example</a>).</p> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">runningCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">updateStateByKey</span><span class="o">[</span><span class="kt">Int</span><span class="o">](</span><span class="n">updateFunction</span> <span class="k">_</span><span class="o">)</span></code></pre></div> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">val</span> <span class="n">runningCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">updateStateByKey</span><span class="o">[</span><span class="kt">Int</span><span class="o">](</span><span class="n">updateFunction</span> <span class="k">_</span><span class="o">)</span></code></pre></figure> <p>The update function will be called for each word, with <code>newValues</code> having a sequence of 1’s (from the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the previous count.</p> @@ -947,18 +947,18 @@ the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the p </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Function2</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>></span> <span class="n">updateFunction</span> <span class="o">=</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">Function2</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>></span> <span class="n">updateFunction</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Function2</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="nf">call</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">state</span><span class="o">)</span> <span class="o">{</span> <span class="n">Integer</span> <span class="n">newSum</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// add the new values with the previous running count to get the new count</span> <span class="k">return</span> <span class="n">Optional</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">newSum</span><span class="o">);</span> <span class="o">}</span> - <span class="o">};</span></code></pre></div> + <span class="o">};</span></code></pre></figure> <p>This is applied on a DStream containing words (say, the <code>pairs</code> DStream containing <code>(word, 1)</code> pairs in the <a href="#a-quick-example">quick example</a>).</p> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">runningCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">updateStateByKey</span><span class="o">(</span><span class="n">updateFunction</span><span class="o">);</span></code></pre></div> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">runningCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">updateStateByKey</span><span class="o">(</span><span class="n">updateFunction</span><span class="o">);</span></code></pre></figure> <p>The update function will be called for each word, with <code>newValues</code> having a sequence of 1’s (from the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the previous count. For the complete @@ -969,15 +969,15 @@ Java code, take a look at the example </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">def</span> <span class="nf">updateFunction</span><span class="p">(</span><span class="n">newValues</span><span class="p">,</span> <span class="n">runningCount</span><span class="p">):</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="k">def</span> <span class="nf">updateFunction</span><span class="p">(</span><span class="n">newValues</span><span class="p">,</span> <span class="n">runningCount</span><span class="p">):</span> <span class="k">if</span> <span class="n">runningCount</span> <span class="ow">is</span> <span class="bp">None</span><span class="p">:</span> <span class="n">runningCount</span> <span class="o">=</span> <span class="mi">0</span> - <span class="k">return</span> <span class="nb">sum</span><span class="p">(</span><span class="n">newValues</span><span class="p">,</span> <span class="n">runningCount</span><span class="p">)</span> <span class="c"># add the new values with the previous running count to get the new count</span></code></pre></div> + <span class="k">return</span> <span class="nb">sum</span><span class="p">(</span><span class="n">newValues</span><span class="p">,</span> <span class="n">runningCount</span><span class="p">)</span> <span class="c1"># add the new values with the previous running count to get the new count</span></code></pre></figure> <p>This is applied on a DStream containing words (say, the <code>pairs</code> DStream containing <code>(word, 1)</code> pairs in the <a href="#a-quick-example">earlier example</a>).</p> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">runningCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">updateStateByKey</span><span class="p">(</span><span class="n">updateFunction</span><span class="p">)</span></code></pre></div> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="n">runningCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">updateStateByKey</span><span class="p">(</span><span class="n">updateFunction</span><span class="p">)</span></code></pre></figure> <p>The update function will be called for each word, with <code>newValues</code> having a sequence of 1’s (from the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the previous count. For the complete @@ -1003,17 +1003,17 @@ spam information (maybe generated with Spark as well) and then filtering based o <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">spamInfoRDD</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">newAPIHadoopRDD</span><span class="o">(...)</span> <span class="c1">// RDD containing spam information</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">val</span> <span class="n">spamInfoRDD</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">newAPIHadoopRDD</span><span class="o">(...)</span> <span class="c1">// RDD containing spam information</span> <span class="k">val</span> <span class="n">cleanedDStream</span> <span class="k">=</span> <span class="n">wordCounts</span><span class="o">.</span><span class="n">transform</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">spamInfoRDD</span><span class="o">).</span><span class="n">filter</span><span class="o">(...)</span> <span class="c1">// join data stream with spam information to do data cleaning</span> <span class="o">...</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> <span class="c1">// RDD containing spam information</span> <span class="kd">final</span> <span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">></span> <span class="n">spamInfoRDD</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="na">sparkContext</span><span class="o">().</span><span class="na">newAPIHadoopRDD</span><span class="o">(...);</span> @@ -1023,15 +1023,15 @@ spam information (maybe generated with Spark as well) and then filtering based o <span class="n">rdd</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">spamInfoRDD</span><span class="o">).</span><span class="na">filter</span><span class="o">(...);</span> <span class="c1">// join data stream with spam information to do data cleaning</span> <span class="o">...</span> <span class="o">}</span> - <span class="o">});</span></code></pre></div> + <span class="o">});</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">spamInfoRDD</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">pickleFile</span><span class="p">(</span><span class="o">...</span><span class="p">)</span> <span class="c"># RDD containing spam information</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="n">spamInfoRDD</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">pickleFile</span><span class="p">(</span><span class="o">...</span><span class="p">)</span> <span class="c1"># RDD containing spam information</span> -<span class="c"># join data stream with spam information to do data cleaning</span> -<span class="n">cleanedDStream</span> <span class="o">=</span> <span class="n">wordCounts</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">spamInfoRDD</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="o">...</span><span class="p">))</span></code></pre></div> +<span class="c1"># join data stream with spam information to do data cleaning</span> +<span class="n">cleanedDStream</span> <span class="o">=</span> <span class="n">wordCounts</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">spamInfoRDD</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="o">...</span><span class="p">))</span></code></pre></figure> </div> </div> @@ -1073,13 +1073,13 @@ operation <code>reduceByKeyAndWindow</code>.</p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Reduce last 30 seconds of data, every 10 seconds</span> -<span class="k">val</span> <span class="n">windowedWordCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="o">((</span><span class="n">a</span><span class="k">:</span><span class="kt">Int</span><span class="o">,</span><span class="n">b</span><span class="k">:</span><span class="kt">Int</span><span class="o">)</span> <span class="k">=></span> <span class="o">(</span><span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">),</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">))</span></code></pre></div> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Reduce last 30 seconds of data, every 10 seconds</span> +<span class="k">val</span> <span class="n">windowedWordCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="o">((</span><span class="n">a</span><span class="k">:</span><span class="kt">Int</span><span class="o">,</span><span class="n">b</span><span class="k">:</span><span class="kt">Int</span><span class="o">)</span> <span class="k">=></span> <span class="o">(</span><span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">),</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">))</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Reduce function adding two integers, defined separately for clarity</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Reduce function adding two integers, defined separately for clarity</span> <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">reduceFunc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="n">Integer</span> <span class="n">i1</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">i2</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">i1</span> <span class="o">+</span> <span class="n">i2</span><span class="o">;</span> @@ -1087,13 +1087,13 @@ operation <code>reduceByKeyAndWindow</code>.</p> <span class="o">};</span> <span class="c1">// Reduce last 30 seconds of data, every 10 seconds</span> -<span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">windowedWordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">reduceByKeyAndWindow</span><span class="o">(</span><span class="n">reduceFunc</span><span class="o">,</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">));</span></code></pre></div> +<span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">windowedWordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">reduceByKeyAndWindow</span><span class="o">(</span><span class="n">reduceFunc</span><span class="o">,</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">));</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># Reduce last 30 seconds of data, every 10 seconds</span> -<span class="n">windowedWordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">:</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="p">,</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">:</span> <span class="n">x</span> <span class="o">-</span> <span class="n">y</span><span class="p">,</span> <span class="mi">30</span><span class="p">,</span> <span class="mi">10</span><span class="p">)</span></code></pre></div> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Reduce last 30 seconds of data, every 10 seconds</span> +<span class="n">windowedWordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">:</span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="p">,</span> <span class="k">lambda</span> <span class="n">x</span><span class="p">,</span> <span class="n">y</span><span class="p">:</span> <span class="n">x</span> <span class="o">-</span> <span class="n">y</span><span class="p">,</span> <span class="mi">30</span><span class="p">,</span> <span class="mi">10</span><span class="p">)</span></code></pre></figure> </div> </div> @@ -1167,48 +1167,48 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.</p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">stream1</span><span class="k">:</span> <span class="kt">DStream</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">val</span> <span class="n">stream1</span><span class="k">:</span> <span class="kt">DStream</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> <span class="k">val</span> <span class="n">stream2</span><span class="k">:</span> <span class="kt">DStream</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> -<span class="k">val</span> <span class="n">joinedStream</span> <span class="k">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">stream2</span><span class="o">)</span></code></pre></div> +<span class="k">val</span> <span class="n">joinedStream</span> <span class="k">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">stream2</span><span class="o">)</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">stream1</span> <span class="o">=</span> <span class="o">...</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">stream1</span> <span class="o">=</span> <span class="o">...</span> <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">stream2</span> <span class="o">=</span> <span class="o">...</span> -<span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">joinedStream</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">stream2</span><span class="o">);</span></code></pre></div> +<span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">joinedStream</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">stream2</span><span class="o">);</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">stream1</span> <span class="o">=</span> <span class="o">...</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="n">stream1</span> <span class="o">=</span> <span class="o">...</span> <span class="n">stream2</span> <span class="o">=</span> <span class="o">...</span> -<span class="n">joinedStream</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">stream2</span><span class="p">)</span></code></pre></div> +<span class="n">joinedStream</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">stream2</span><span class="p">)</span></code></pre></figure> </div> </div> -<p>Here, in each batch interval, the RDD generated by <code>stream1</code> will be joined with the RDD generated by <code>stream2</code>. You can also do <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, <code>fullOuterJoin</code>. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.</p> +<p>Here, in each batch interval, the RDD generated by <code>stream1</code> will be joined with the RDD generated by <code>stream2</code>. You can also do <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, <code>fullOuterJoin</code>. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well. </p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">windowedStream1</span> <span class="k">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">))</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">val</span> <span class="n">windowedStream1</span> <span class="k">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">))</span> <span class="k">val</span> <span class="n">windowedStream2</span> <span class="k">=</span> <span class="n">stream2</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Minutes</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span> -<span class="k">val</span> <span class="n">joinedStream</span> <span class="k">=</span> <span class="n">windowedStream1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">windowedStream2</span><span class="o">)</span></code></pre></div> +<span class="k">val</span> <span class="n">joinedStream</span> <span class="k">=</span> <span class="n">windowedStream1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">windowedStream2</span><span class="o">)</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">windowedStream1</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">));</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">windowedStream1</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">));</span> <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">windowedStream2</span> <span class="o">=</span> <span class="n">stream2</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Durations</span><span class="o">.</span><span class="na">minutes</span><span class="o">(</span><span class="mi">1</span><span class="o">));</span> -<span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">windowedStream2</span><span class="o">);</span></code></pre></div> +<span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream1</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">windowedStream2</span><span class="o">);</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">windowedStream1</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="mi">20</span><span class="p">)</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="n">windowedStream1</span> <span class="o">=</span> <span class="n">stream1</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="mi">20</span><span class="p">)</span> <span class="n">windowedStream2</span> <span class="o">=</span> <span class="n">stream2</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="mi">60</span><span class="p">)</span> -<span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream1</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">windowedStream2</span><span class="p">)</span></code></pre></div> +<span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream1</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">windowedStream2</span><span class="p">)</span></code></pre></figure> </div> </div> @@ -1219,14 +1219,14 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.</p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">dataset</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="k">val</span> <span class="n">dataset</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> <span class="k">val</span> <span class="n">windowedStream</span> <span class="k">=</span> <span class="n">stream</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">))...</span> -<span class="k">val</span> <span class="n">joinedStream</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">transform</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">dataset</span><span class="o">)</span> <span class="o">}</span></code></pre></div> +<span class="k">val</span> <span class="n">joinedStream</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">transform</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">dataset</span><span class="o">)</span> <span class="o">}</span></code></pre></figure> </div> <div data-lang="java"> - <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">dataset</span> <span class="o">=</span> <span class="o">...</span> + <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">dataset</span> <span class="o">=</span> <span class="o">...</span> <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Durations</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">));</span> <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">transform</span><span class="o">(</span> <span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">JavaRDD</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>,</span> <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>>>()</span> <span class="o">{</span> @@ -1235,14 +1235,14 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.</p> <span class="k">return</span> <span class="n">rdd</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">dataset</span><span class="o">);</span> <span class="o">}</span> <span class="o">}</span> -<span class="o">);</span></code></pre></div> +<span class="o">);</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">dataset</span> <span class="o">=</span> <span class="o">...</span> <span class="c"># some RDD</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="n">dataset</span> <span class="o">=</span> <span class="o">...</span> <span class="c1"># some RDD</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="n">window</span><span class="p">(</span><span class="mi">20</span><span class="p">)</span> -<span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">dataset</span><span class="p">))</span></code></pre></div> +<span class="n">joinedStream</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">transform</span><span class="p">(</span><span class="k">lambda</span> <span class="n">rdd</span><span class="p">:</span> <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">dataset</span><span class="p">))</span></code></pre></figure> </div> </div> @@ -1324,22 +1324,22 @@ For example (in Scala),</p> <div class="codetabs"> <div data-lang="scala"> - <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">dstream</span><span class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> + <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="n">dstream</span><span class="o">.</span><span class="n">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=></span> <span class="k">val</span> <span class="n">connection</span> <span class="k">=</span> <span class="n">createNewConnection</span><span class="o">()</span> <span class="c1">// executed at the driver</span> <span class="n">rdd</span><span class="o">.</span><span class="n">foreach</span> <span class="o">{</span> <span class="n">record</span> <span class="k">=></span> <span class="n">connection</span><span class="o">.</span><span class="n">send</span><span class="o">(</span><span class="n">record</span><span class="o">)</span> <span class="c1">// executed at the worker</span> <span class="o">}</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> </div> <div data-lang="python"> - <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">def</span> <span class="nf">sendRecord</span><span class="p">(</span><span class="n">rdd</span><span class="p">):</span> - <span class="n">connection</span> <span class="o">=</span> <span class="n">createNewConnection</span><span class="p">()</span> <span class="c"># executed at the driver</span> + <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="k">def</span> <span class="nf">sendRecord</span><span class="p">(</span><span class="n">rdd</span><span class="p">):</span> + <span class="n">connection</span> <span class="o">=</span> <span class="n">createNewConnection</span><span class="p">()</span> <span class="c1"># executed at the driver</span> <span class="n">rdd</span><span class="o">.</span><span class="n">foreach</span><span class="p">(</span><span class="k">lambda</span> <span class="n">record</span><span class="p">:</span> <span class="n">connection</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">record</span><span class="p">))</span> <span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> -<span class="n">dstream</span><span class="o">.</span><span class="n">foreachRDD</span><span class="p">(</span><span class="n">sendRecord</span><span class="p">)</span></code></pre></div> +<span class="n">dstream</span><span class="
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org