http://git-wip-us.apache.org/repos/asf/flink-web/blob/a16dddeb/content/news/2015/02/09/streaming-example.html ---------------------------------------------------------------------- diff --git a/content/news/2015/02/09/streaming-example.html b/content/news/2015/02/09/streaming-example.html new file mode 100644 index 0000000..fae6720 --- /dev/null +++ b/content/news/2015/02/09/streaming-example.html @@ -0,0 +1,946 @@ +<!DOCTYPE html> +<html lang="en"> + <head> + <meta charset="utf-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + + <title>Apache Flink: Introducing Flink Streaming</title> + <link rel="shortcut icon" href="favicon.ico" type="image/x-icon"> + <link rel="icon" href="favicon.ico" type="image/x-icon"> + <link rel="stylesheet" href="/css/bootstrap.css"> + <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css"> + <link rel="stylesheet" href="/css/syntax.css"> + <link rel="stylesheet" href="/css/custom.css"> + <link href="/css/main/main.css" rel="stylesheet"> + <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Flink Blog RSS feed" /> + <!-- <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet"> --> + <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js"></script> + <script src="/js/bootstrap.min.js"></script> + <script src="/js/codetabs.js"></script> + </head> + <body> + <div class="af-header-container af-inner-pages-navigation"> + <header> + <div class="container"> + <div class="row"> + <div class="col-md-1 af-mobile-nav-bar"> + <a href="/" title="Home"> + <img class="hidden-xs hidden-sm img-responsive" + src="/img/main/logo.png" alt="Apache Flink Logo"> + </a> + <div class="row visible-xs"> + <div class="col-xs-3"> + <a href="/" title="Home"> + <img class="hidden-x hidden-sm img-responsive" + src="/img/main/logo.png" alt="Apache Flink Logo"> + </a> + </div> + <div class="col-xs-5"></div> + <div class="col-xs-4"> + <div class="af-mobile-btn"> + <span class="glyphicon glyphicon-plus"></span> + </div> + </div> + </div> + </div> + <!-- Navigation --> + <div class="col-md-11"> + <nav class="af-main-nav" role="navigation"> + <ul> + <li><a href="#" class="af-nav-links">Quickstart + <b class="caret"></b> + </a> + <ul class="af-dropdown-menu"> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html">Setup + Flink</a></li> + <li><a + href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/java_api_quickstart.html">Java + API</a></li> + <li><a + href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/scala_api_quickstart.html">Scala + API</a></li> + </ul></li> + <li><a href="/downloads.html">Download</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/faq.html">FAQ</a></li> + <li><a href="#" class="af-nav-links">Documentation <b + class="caret"></b></a> + <ul class="af-dropdown-menu"> + <li class="af-separator">Current Snapshot:</li> + <li></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/">0.9</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/java">0.9 Javadocs</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/scala/index.html#org.apache.flink.api.scala.package">0.9 Scaladocs</a></li> + <li class="divider"></li> + <li class="af-separator">Current Stable:</li> + <li></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/">0.8.1</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/java">0.8.1 Javadocs</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/scala/index.html#org.apache.flink.api.scala.package">0.8.1 Scaladocs</a></li> + <li class="divider"></li> + <li></li> + <li><a href="/archive.html">Archive</a></li> + <li></li> + </ul></li> + <li><a href="#" class="af-nav-links">Community <b + class="caret"></b></a> + <ul class="af-dropdown-menu"> + <li><a href="/community.html#getting-help">Getting Help</a></li> + <li><a href="/community.html#mailing-lists">Mailing Lists</a></li> + <li><a href="/community.html#issues">Issues</a></li> + <li><a href="/community.html#team">Team</a></li> + <li class="divider"></li> + <li><a href="/how-to-contribute.html">How To + Contribute</a></li> + <li><a href="/coding_guidelines.html">Coding + Guidelines</a></li> + </ul></li> + <li><a href="#" class="af-nav-links">Project <b + class="caret"></b></a> + <ul class="af-dropdown-menu"> + <li><a href="/material.html">Material</a></li> + <li><a href="http://www.apache.org/">Apache Software + Foundation <span class="glyphicon glyphicon-new-window"></span> + </a></li> + <li><a + href="https://cwiki.apache.org/confluence/display/FLINK">Wiki + <span class="glyphicon glyphicon-new-window"></span> + </a></li> + <li><a + href="https://wiki.apache.org/incubator/StratosphereProposal">Incubator + Proposal <span class="glyphicon glyphicon-new-window"></span> + </a></li> + <li><a href="http://www.apache.org/licenses/LICENSE-2.0">License + <span class="glyphicon glyphicon-new-window"></span> + </a></li> + <li><a href="https://github.com/apache/incubator-flink">Source + Code <span class="glyphicon glyphicon-new-window"></span> + </a></li> + </ul></li> + <li><a href="/blog/index.html" class="">Blog</a></li> + </ul> + </nav> + </div> + </div> + </div> + </header> +</div> + + + <div style="padding-top:50px" class="container"> + <div class="container"> + <div class="row"> + <div class="col-md-2"></div> + <div class="col-md-8"> + <article> + <h2>Introducing Flink Streaming</h2> + <p class="meta">09 Feb 2015</p> + <div> + <p>This post is the first of a series of blog posts on Flink Streaming, +the recent addition to Apache Flink that makes it possible to analyze +continuous data sources in addition to static files. Flink Streaming +uses the pipelined Flink engine to process data streams in real time +and offers a new API including definition of flexible windows.</p> + +<p>In this post, we go through an example that uses the Flink Streaming +API to compute statistics on stock market data that arrive +continuously and combine the stock market data with Twitter streams. +See the <a href="http://flink.apache.org/docs/latest/streaming_guide.html">Streaming Programming +Guide</a> for a +detailed presentation of the Streaming API.</p> + +<p>First, we read a bunch of stock price streams and combine them into +one stream of market data. We apply several transformations on this +market data stream, like rolling aggregations per stock. Then we emit +price warning alerts when the prices are rapidly changing. Moving +towards more advanced features, we compute rolling correlations +between the market data streams and a Twitter stream with stock mentions.</p> + +<p>For running the example implementation please use the <em>0.9-SNAPSHOT</em> +version of Flink as a dependency. The full example code base can be +found <a href="https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala">here</a> in Scala and <a href="https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java">here</a> in Java7.</p> + +<p><a href="#top"></a></p> + +<p><a href="#top">Back to top</a></p> + +<h2 id="reading-from-multiple-inputs">Reading from multiple inputs</h2> + +<p>First, let us create the stream of stock prices:</p> + +<ol> +<li>Read a socket stream of stock prices</li> +<li>Parse the text in the stream to create a stream of <code>StockPrice</code> objects</li> +<li>Add four other sources tagged with the stock symbol.</li> +<li>Finally, merge the streams to create a unified stream. </li> +</ol> + +<p><img alt="Reading from multiple inputs" src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive center-block"></p> + +<div class="codetabs" markdown="1"> +<div data-lang="scala" markdown="1"> + +<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span> + + <span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span> + + <span class="c1">//Read from a socket stream at map it to StockPrice objects</span> + <span class="k">val</span> <span class="n">socketStockStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="o">{</span> + <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">x</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span> + <span class="nc">StockPrice</span><span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">toDouble</span><span class="o">)</span> + <span class="o">})</span> + + <span class="c1">//Generate other stock streams</span> + <span class="k">val</span> <span class="nc">SPX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">)(</span><span class="mi">10</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> + <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"FTSE"</span><span class="o">)(</span><span class="mi">20</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> + <span class="k">val</span> <span class="nc">DJI_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"DJI"</span><span class="o">)(</span><span class="mi">30</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> + <span class="k">val</span> <span class="nc">BUX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"BUX"</span><span class="o">)(</span><span class="mi">40</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> + + <span class="c1">//Merge all stock streams together</span> + <span class="k">val</span> <span class="n">stockStream</span> <span class="k">=</span> <span class="n">socketStockStream</span><span class="o">.</span><span class="n">merge</span><span class="o">(</span><span class="nc">SPX_Stream</span><span class="o">,</span> <span class="nc">FTSE_Stream</span><span class="o">,</span> + <span class="nc">DJI_Stream</span><span class="o">,</span> <span class="nc">BUX_Stream</span><span class="o">)</span> + + <span class="n">stockStream</span><span class="o">.</span><span class="n">print</span><span class="o">()</span> + + <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">"Stock stream"</span><span class="o">)</span> +<span class="o">}</span></code></pre></div> + +</div> + +<div data-lang="java7" markdown="1"> + +<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + + <span class="kd">final</span> <span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> + <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> + + <span class="c1">//Read from a socket stream at map it to StockPrice objects</span> + <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">socketStockStream</span> <span class="o">=</span> <span class="n">env</span> + <span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span> + <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">>()</span> <span class="o">{</span> + <span class="kd">private</span> <span class="n">String</span><span class="o">[]</span> <span class="n">tokens</span><span class="o">;</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">StockPrice</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="n">tokens</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> + <span class="k">return</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> + <span class="n">Double</span><span class="o">.</span><span class="na">parseDouble</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span> + <span class="o">}</span> + <span class="o">});</span> + + <span class="c1">//Generate other stock streams</span> + <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">SPX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="mi">10</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">FTSE_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"FTSE"</span><span class="o">,</span> <span class="mi">20</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">DJI_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"DJI"</span><span class="o">,</span> <span class="mi">30</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">BUX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">"BUX"</span><span class="o">,</span> <span class="mi">40</span><span class="o">));</span> + + <span class="c1">//Merge all stock streams together</span> + <span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">stockStream</span> <span class="o">=</span> <span class="n">socketStockStream</span> + <span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">SPX_stream</span><span class="o">,</span> <span class="n">FTSE_stream</span><span class="o">,</span> <span class="n">DJI_stream</span><span class="o">,</span> <span class="n">BUX_stream</span><span class="o">);</span> + + <span class="n">stockStream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> + + <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">"Stock stream"</span><span class="o">);</span></code></pre></div> + +</div> + +<p></div></p> + +<p>See +<a href="http://flink.apache.org/docs/latest/streaming_guide.html#sources">here</a> +on how you can create streaming sources for Flink Streaming +programs. Flink, of course, has support for reading in streams from +<a href="http://flink.apache.org/docs/latest/streaming_guide.html#stream-connectors">external +sources</a> +such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake +of this example, the data streams are simply generated using the +<code>generateStock</code> method:</p> + +<div class="codetabs" markdown="1"> +<div data-lang="scala" markdown="1"> + +<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">symbols</span> <span class="k">=</span> <span class="nc">List</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="s">"FTSE"</span><span class="o">,</span> <span class="s">"DJI"</span><span class="o">,</span> <span class="s">"DJT"</span><span class="o">,</span> <span class="s">"BUX"</span><span class="o">,</span> <span class="s">"DAX"</span><span class="o">,</span> <span class="s">"GOOG"</span><span class="o">)</span> + +<span class="k">case</span> <span class="k">class</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">price</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> + +<span class="k">def</span> <span class="n">generateStock</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">)(</span><span class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> + <span class="k">var</span> <span class="n">price</span> <span class="k">=</span> <span class="mf">1000.</span> + <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> + <span class="n">price</span> <span class="k">=</span> <span class="n">price</span> <span class="o">+</span> <span class="nc">Random</span><span class="o">.</span><span class="n">nextGaussian</span> <span class="o">*</span> <span class="n">sigma</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">))</span> + <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">))</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + +</div> + +<div data-lang="java7" markdown="1"> + +<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">SYMBOLS</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">>(</span> + <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="s">"FTSE"</span><span class="o">,</span> <span class="s">"DJI"</span><span class="o">,</span> <span class="s">"DJT"</span><span class="o">,</span> <span class="s">"BUX"</span><span class="o">,</span> <span class="s">"DAX"</span><span class="o">,</span> <span class="s">"GOOG"</span><span class="o">));</span> + +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockPrice</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> + + <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span> + <span class="kd">public</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span> + + <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">()</span> <span class="o">{</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Double</span> <span class="n">price</span><span class="o">)</span> <span class="o">{</span> + <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span> + <span class="k">this</span><span class="o">.</span><span class="na">price</span> <span class="o">=</span> <span class="n">price</span><span class="o">;</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span> + <span class="k">return</span> <span class="s">"StockPrice{"</span> <span class="o">+</span> + <span class="s">"symbol='"</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">'\''</span> <span class="o">+</span> + <span class="s">", count="</span> <span class="o">+</span> <span class="n">price</span> <span class="o">+</span> + <span class="sc">'}'</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="o">{</span> + + <span class="kd">private</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">;</span> + + <span class="kd">public</span> <span class="nf">StockSource</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">)</span> <span class="o">{</span> + <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span> + <span class="k">this</span><span class="o">.</span><span class="na">sigma</span> <span class="o">=</span> <span class="n">sigma</span><span class="o">;</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="n">price</span> <span class="o">=</span> <span class="n">DEFAULT_PRICE</span><span class="o">;</span> + <span class="n">Random</span> <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span> + + <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> + <span class="n">price</span> <span class="o">=</span> <span class="n">price</span> <span class="o">+</span> <span class="n">random</span><span class="o">.</span><span class="na">nextGaussian</span><span class="o">()</span> <span class="o">*</span> <span class="n">sigma</span><span class="o">;</span> + <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">));</span> + <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">));</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + +</div> + +<p></div></p> + +<p>To read from the text socket stream please make sure that you have a +socket running. For the sake of the example executing the following +command in a terminal does the job. You can get +<a href="http://netcat.sourceforge.net/">netcat</a> here if it is not available +on your machine.</p> +<div class="highlight"><pre><code class="language-text" data-lang="text">nc -lk 9999 +</code></pre></div> +<p>If we execute the program from our IDE we see the system the +stock prices being generated:</p> +<div class="highlight"><pre><code class="language-text" data-lang="text">INFO Job execution switched to status RUNNING. +INFO Socket Stream(1/1) switched to SCHEDULED +INFO Socket Stream(1/1) switched to DEPLOYING +INFO Custom Source(1/1) switched to SCHEDULED +INFO Custom Source(1/1) switched to DEPLOYING +⦠+1> StockPrice{symbol='SPX', count=1011.3405732645239} +2> StockPrice{symbol='SPX', count=1018.3381290039248} +1> StockPrice{symbol='DJI', count=1036.7454894073978} +3> StockPrice{symbol='DJI', count=1135.1170217478427} +3> StockPrice{symbol='BUX', count=1053.667523187687} +4> StockPrice{symbol='BUX', count=1036.552601487263} +</code></pre></div> +<p><a href="#top">Back to top</a></p> + +<h2 id="window-aggregations">Window aggregations</h2> + +<p>We first compute aggregations on time-based windows of the +data. Flink provides <a href="http://flink.apache.org/docs/latest/streaming_guide.html#window-operators">flexible windowing semantics</a> where windows can +also be defined based on count of records or any custom user defined +logic.</p> + +<p>We partition our stream into windows of 10 seconds and slide the +window every 5 seconds. We compute three statistics every 5 seconds. +The first is the minimum price of all stocks, the second produces +maximum price per stock, and the third is the mean stock price +(using a map window function). Aggregations and groupings can be +performed on named fields of POJOs, making the code more readable.</p> + +<p><img alt="Basic windowing aggregations" src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive center-block"></p> + +<div class="codetabs" markdown="1"> + +<div data-lang="scala" markdown="1"> + + +<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Define the desired time window</span> +<span class="k">val</span> <span class="n">windowedStream</span> <span class="k">=</span> <span class="n">stockStream</span> + <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> + +<span class="c1">//Compute some simple statistics on a rolling window</span> +<span class="k">val</span> <span class="n">lowest</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">minBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">)</span> +<span class="k">val</span> <span class="n">maxByStock</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="n">maxBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">)</span> +<span class="k">val</span> <span class="n">rollingMean</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">mean</span> <span class="k">_</span><span class="o">)</span> + +<span class="c1">//Compute the mean of a window</span> +<span class="k">def</span> <span class="n">mean</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> + <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span class="o">(</span><span class="mi">0</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">.</span><span class="n">price</span><span class="o">)</span> <span class="o">/</span> <span class="n">ts</span><span class="o">.</span><span class="n">size</span><span class="o">))</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<div data-lang="java7" markdown="1"> + + +<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Define the desired time window</span> +<span class="n">WindowedDataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">stockStream</span> + <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span> + <span class="o">.</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">));</span> + +<span class="c1">//Compute some simple statistics on a rolling window</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">lowest</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">minBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">maxByStock</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="na">maxBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">rollingMean</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowMean</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> + +<span class="c1">//Compute the mean of a window</span> +<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WindowMean</span> <span class="kd">implements</span> + <span class="n">WindowMapFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">></span> <span class="o">{</span> + + <span class="kd">private</span> <span class="n">Double</span> <span class="n">sum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span> <span class="o">=</span> <span class="s">""</span><span class="o">;</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> + <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + + <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span><span class="n">s</span> + <span class="nf">for</span> <span class="o">(</span><span class="n">StockPrice</span> <span class="n">sp</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> + <span class="n">sum</span> <span class="o">+=</span> <span class="n">sp</span><span class="o">.</span><span class="na">price</span><span class="o">;</span> + <span class="n">symbol</span> <span class="o">=</span> <span class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span class="o">;</span> + <span class="n">count</span><span class="o">++;</span> + <span class="o">}</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> <span class="o">/</span> <span class="n">count</span><span class="o">));</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<p></div></p> + +<p>Let us note that to print a windowed stream one has to flatten it first, +thus getting rid of the windowing logic. For example execute +<code>maxByStock.flatten().print()</code> to print the stream of maximum prices of + the time windows by stock. For Scala <code>flatten()</code> is called implicitly +when needed.</p> + +<p><a href="#top">Back to top</a></p> + +<h2 id="data-driven-windows">Data-driven windows</h2> + +<p>The most interesting event in the stream is when the price of a stock +is changing rapidly. We can send a warning when a stock price changes +more than 5% since the last warning. To do that, we use a delta-based window providing a +threshold on when the computation will be triggered, a function to +compute the difference and a default value with which the first record +is compared. We also create a <code>Count</code> data type to count the warnings +every 30 seconds.</p> + +<p><img alt="Data-driven windowing semantics" src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive center-block"></p> + +<div class="codetabs" markdown="1"> + +<div data-lang="scala" markdown="1"> + + +<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">Count</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> +<span class="k">val</span> <span class="n">defaultPrice</span> <span class="k">=</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="s">""</span><span class="o">,</span> <span class="mi">1000</span><span class="o">)</span> + +<span class="c1">//Use delta policy to create price change warnings</span> +<span class="k">val</span> <span class="n">priceWarnings</span> <span class="k">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="n">priceChange</span><span class="o">,</span> <span class="n">defaultPrice</span><span class="o">))</span> + <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">sendWarning</span> <span class="k">_</span><span class="o">)</span> + +<span class="c1">//Count the number of warnings every half a minute</span> +<span class="k">val</span> <span class="n">warningsPerStock</span> <span class="k">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> + <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> + <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> + +<span class="k">def</span> <span class="n">priceChange</span><span class="o">(</span><span class="n">p1</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">,</span> <span class="n">p2</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span> + <span class="nc">Math</span><span class="o">.</span><span class="n">abs</span><span class="o">(</span><span class="n">p1</span><span class="o">.</span><span class="n">price</span> <span class="o">/</span> <span class="n">p2</span><span class="o">.</span><span class="n">price</span> <span class="o">-</span> <span class="mi">1</span><span class="o">)</span> +<span class="o">}</span> + +<span class="k">def</span> <span class="n">sendWarning</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> + <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">)</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<div data-lang="java7" markdown="1"> + + +<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Double</span> <span class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span class="mi">1000</span><span class="o">.;</span> +<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">StockPrice</span> <span class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="s">""</span><span class="o">,</span> <span class="n">DEFAULT_PRICE</span><span class="o">);</span> + +<span class="c1">//Use delta policy to create price change warnings</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">priceWarnings</span> <span class="o">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Delta</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="k">new</span> <span class="n">DeltaFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">double</span> <span class="nf">getDelta</span><span class="o">(</span><span class="n">StockPrice</span> <span class="n">oldDataPoint</span><span class="o">,</span> <span class="n">StockPrice</span> <span class="n">newDataPoint</span><span class="o">)</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">abs</span><span class="o">(</span><span class="n">oldDataPoint</span><span class="o">.</span><span class="na">price</span> <span class="o">-</span> <span class="n">newDataPoint</span><span class="o">.</span><span class="na">price</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span class="o">))</span> +<span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">SendWarning</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> + +<span class="c1">//Count the number of warnings every half a minute</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">Count</span><span class="o">></span> <span class="n">warningsPerStock</span> <span class="o">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> + +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Count</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span> + <span class="kd">public</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span> + + <span class="kd">public</span> <span class="nf">Count</span><span class="o">()</span> <span class="o">{</span> + <span class="o">}</span> + + <span class="kd">public</span> <span class="nf">Count</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">)</span> <span class="o">{</span> + <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span> + <span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">count</span><span class="o">;</span> + <span class="o">}</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span> + <span class="k">return</span> <span class="s">"Count{"</span> <span class="o">+</span> + <span class="s">"symbol='"</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">'\''</span> <span class="o">+</span> + <span class="s">", count="</span> <span class="o">+</span> <span class="n">count</span> <span class="o">+</span> + <span class="sc">'}'</span><span class="o">;</span> + <span class="o">}</span> +<span class="o">}</span> + +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SendWarning</span> <span class="kd">implements</span> <span class="n">MapWindowFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> + <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + + <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">next</span><span class="o">().</span><span class="na">symbol</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<p></div></p> + +<p><a href="#top">Back to top</a></p> + +<h2 id="combining-with-a-twitter-stream">Combining with a Twitter stream</h2> + +<p>Next, we will read a Twitter stream and correlate it with our stock +price stream. Flink has support for connecting to <a href="http://flink.apache.org/docs/latest/streaming_guide.html#twitter-streaming-api">Twitter's +API</a>, +but for the sake of this example we generate dummy tweet data.</p> + +<p><img alt="Social media analytics" src="/img/blog/blog_social_media.png" width="100%" class="img-responsive center-block"></p> + +<div class="codetabs" markdown="1"> + +<div data-lang="scala" markdown="1"> + + + +<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Read a stream of tweets</span> +<span class="k">val</span> <span class="n">tweetStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span> + +<span class="c1">//Extract the stock symbols</span> +<span class="k">val</span> <span class="n">mentionedSymbols</span> <span class="k">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="n">tweet</span> <span class="k">=></span> <span class="n">tweet</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> + <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">toUpperCase</span><span class="o">())</span> + <span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">contains</span><span class="o">(</span><span class="k">_</span><span class="o">))</span> + +<span class="c1">//Count the extracted symbols</span> +<span class="k">val</span> <span class="n">tweetsPerStock</span> <span class="k">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> + <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> + <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> + +<span class="k">def</span> <span class="n">generateTweets</span><span class="o">(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> + <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> + <span class="k">val</span> <span class="n">s</span> <span class="k">=</span> <span class="k">for</span> <span class="o">(</span><span class="n">i</span> <span class="k"><-</span> <span class="mi">1</span> <span class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span class="k">yield</span> <span class="o">(</span><span class="n">symbols</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">size</span><span class="o">)))</span> + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="n">mkString</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> + <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">500</span><span class="o">))</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<div data-lang="java7" markdown="1"> + + +<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Read a stream of tweets</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">tweetStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TweetSource</span><span class="o">());</span> + +<span class="c1">//Extract the stock symbols</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">mentionedSymbols</span> <span class="o">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span> + <span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="n">String</span><span class="o">[]</span> <span class="n">words</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">);</span> + <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span> + <span class="o">}</span> + <span class="o">}</span> +<span class="o">}).</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">FilterFunction</span><span class="o"><</span><span class="n">String</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">SYMBOLS</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="n">value</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">});</span> + +<span class="c1">//Count the extracted symbols</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">Count</span><span class="o">></span> <span class="n">tweetsPerStock</span> <span class="o">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> + +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">TweetSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="o">{</span> + <span class="n">Random</span> <span class="n">random</span><span class="o">;</span> + <span class="n">StringBuilder</span> <span class="n">stringBuilder</span><span class="o">;</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span> + <span class="n">stringBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StringBuilder</span><span class="o">();</span> + + <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> + <span class="n">stringBuilder</span><span class="o">.</span><span class="na">setLength</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> + <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o"><</span> <span class="mi">3</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> + <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="s">" "</span><span class="o">);</span> + <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">size</span><span class="o">())));</span> + <span class="o">}</span> + <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">stringBuilder</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span> + <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="mi">500</span><span class="o">);</span> + <span class="o">}</span> + + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<p></div></p> + +<p><a href="#top">Back to top</a></p> + +<h2 id="streaming-joins">Streaming joins</h2> + +<p>Finally, we join real-time tweets and stock prices and compute a +rolling correlation between the number of price warnings and the +number of mentions of a given stock in the Twitter stream. As both of +these data streams are potentially infinite, we apply the join on a +30-second window.</p> + +<p><img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" class="img-responsive center-block"> </p> + +<div class="codetabs" markdown="1"> + +<div data-lang="scala" markdown="1"> + + + +<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Join warnings and parsed tweets</span> +<span class="k">val</span> <span class="n">tweetsAndWarning</span> <span class="k">=</span> <span class="n">warningsPerStock</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span> + <span class="o">.</span><span class="n">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)</span> + <span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="n">equalTo</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span class="o">)</span> <span class="k">=></span> <span class="o">(</span><span class="n">c1</span><span class="o">.</span><span class="n">count</span><span class="o">,</span> <span class="n">c2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> <span class="o">}</span> + +<span class="k">val</span> <span class="n">rollingCorrelation</span> <span class="k">=</span> <span class="n">tweetsAndWarning</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> + <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">computeCorrelation</span> <span class="k">_</span><span class="o">)</span> + +<span class="n">rollingCorrelation</span> <span class="n">print</span> + +<span class="c1">//Compute rolling correlation</span> +<span class="k">def</span> <span class="n">computeCorrelation</span><span class="o">(</span><span class="n">input</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Double</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> + <span class="k">if</span> <span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span> + <span class="k">val</span> <span class="n">var1</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span> + <span class="k">val</span> <span class="n">mean1</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">)</span> + <span class="k">val</span> <span class="n">var2</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> + <span class="k">val</span> <span class="n">mean2</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">)</span> + + <span class="k">val</span> <span class="n">cov</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">zip</span><span class="o">(</span><span class="n">var2</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">xy</span> <span class="k">=></span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">)))</span> + <span class="k">val</span> <span class="n">d1</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span> + <span class="k">val</span> <span class="n">d2</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span> + + <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">d1</span> <span class="o">*</span> <span class="n">d2</span><span class="o">))</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<div data-lang="java7" markdown="1"> + + +<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Join warnings and parsed tweets</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">tweetsAndWarning</span> <span class="o">=</span> <span class="n">warningsPerStock</span> + <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span> + <span class="o">.</span><span class="na">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)</span> + <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> + <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="n">JoinFunction</span><span class="o"><</span><span class="n">Count</span><span class="o">,</span> <span class="n">Count</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="nf">join</span><span class="o">(</span><span class="n">Count</span> <span class="n">first</span><span class="o">,</span> <span class="n">Count</span> <span class="n">second</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">first</span><span class="o">.</span><span class="na">count</span><span class="o">,</span> <span class="n">second</span><span class="o">.</span><span class="na">count</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">});</span> + +<span class="c1">//Compute rolling correlation</span> +<span class="n">DataStream</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">rollingCorrelation</span> <span class="o">=</span> <span class="n">tweetsAndWarning</span> + <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span> + <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowCorrelation</span><span class="o">());</span> + +<span class="n">rollingCorrelation</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> + +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">WindowCorrelation</span> + <span class="kd">implements</span> <span class="n">WindowMapFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">Double</span><span class="o">></span> <span class="o">{</span> + + <span class="kd">private</span> <span class="n">Integer</span> <span class="n">leftSum</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Integer</span> <span class="n">rightSum</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span> + + <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftMean</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightMean</span><span class="o">;</span> + + <span class="kd">private</span> <span class="n">Double</span> <span class="n">cov</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftSd</span><span class="o">;</span> + <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightSd</span><span class="o">;</span> + + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> + <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> + + <span class="n">leftSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="n">rightSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> + + <span class="n">cov</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span> + <span class="n">leftSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span> + <span class="n">rightSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span> + + <span class="c1">//compute mean for both sides, save count</span> + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> + <span class="n">leftSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span> + <span class="n">rightSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> + <span class="n">count</span><span class="o">++;</span> + <span class="o">}</span> + + <span class="n">leftMean</span> <span class="o">=</span> <span class="n">leftSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> + <span class="n">rightMean</span> <span class="o">=</span> <span class="n">rightSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> + + <span class="c1">//compute covariance & std. deviations</span> + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> + <span class="n">cov</span> <span class="o">+=</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> + <span class="o">}</span> + + <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span> + <span class="n">leftSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> + <span class="n">rightSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span> + <span class="o">}</span> + <span class="n">leftSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">leftSd</span><span class="o">);</span> + <span class="n">rightSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">rightSd</span><span class="o">);</span> + + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">leftSd</span> <span class="o">*</span> <span class="n">rightSd</span><span class="o">));</span> + <span class="o">}</span> +<span class="o">}</span></code></pre></div> + + +</div> + +<p></div></p> + +<p><a href="#top">Back to top</a></p> + +<h2 id="other-things-to-try">Other things to try</h2> + +<p>For a full feature overview please check the <a href="http://flink.apache.org/docs/latest/streaming_guide.html">Streaming Guide</a>, which describes all the available API features. +You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to <a href="http://flink.apache.org/community.html#mailing-lists">contact us</a>.</p> + +<h2 id="upcoming-for-streaming">Upcoming for streaming</h2> + +<p>There are some aspects of Flink Streaming that are subjects to +change by the next release making this application look even nicer.</p> + +<p>Stay tuned for later blog posts on how Flink Streaming works +internally, fault tolerance, and performance measurements!</p> + +<p><a href="#top">Back to top</a></p> + + </div> + </article> + </div> + <div class="col-md-2"></div> + </div> + <div class="row" style="padding-top:30px"> + <div class="col-md-2"></div> + <div class="col-md-8"> + <div id="disqus_thread"></div> + <script type="text/javascript"> + /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */ + var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname + + /* * * DON'T EDIT BELOW THIS LINE * * */ + (function() { + var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; + dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; + (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); + })(); + </script> + <noscript>Please enable JavaScript to view the <a href="http://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript> + <a href="http://disqus.com" class="dsq-brlink">comments powered by <span class="logo-disqus">Disqus</span></a> + </div> + <div class="col-md-2"></div> + </div> +</div> + + </div> + <!--<section id="af-upfooter" class="af-section"> + <div class="container"> + <p>Apache Flink is an effort undergoing incubation at The Apache + Software Foundation (ASF), sponsored by the Apache Incubator PMC. + Incubation is required of all newly accepted projects until a further + review indicates that the infrastructure, communications, and + decision making process have stabilized in a manner consistent with + other successful ASF projects. While incubation status is not + necessarily a reflection of the completeness or stability of the + code, it does indicate that the project has yet to be fully endorsed + by the ASF.</p> + <a href="http://incubator.apache.org"> <img class="img-responsive" + src="/img/main/apache-incubator-logo.png" alt="Apache Flink" /> + </a> + <p class="text-center"> + <a href="/privacy-policy.html" title="Privacy Policy" + class="af-privacy-policy">Privacy Policy</a> + </p> + </div> +</section>--> + +<footer id="af-footer"> + <div class="container"> + <div class="row"> + <div class="col-md-3"> + <h3>Documentation</h3> + <ul class="af-footer-menu"> + + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/">0.8.1</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/java/">0.8.1 Javadocs</a></li> + <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/scala/index.html#org.apache.flink.api.scala.package">0.8.1 Scaladocs</a></li> + </ul> + </div> + <div class="col-md-3"> + <h3>Community</h3> + <ul class="af-footer-menu"> + <li><a href="/community.html#mailing-lists">Mailing Lists</a></li> + <li><a href="https://issues.apache.org/jira/browse/FLINK" + target="blank">Issues <span + class="glyphicon glyphicon-new-window"></span></a></li> + <li><a href="/community.html#team">Team</a></li> + <li><a href="/how-to-contribute.html">How to contribute</a></li> + <li><a href="/coding_guidelines.html">Coding Guidelines</a></li> + </ul> + </div> + <div class="col-md-3"> + <h3>ASF</h3> + <ul class="af-footer-menu"> + <li><a href="http://www.apache.org/" target="blank">Apache + Software foundation <span class="glyph
<TRUNCATED>