Author: mbalassi
Date: Mon Feb  9 12:34:25 2015
New Revision: 1658386

URL: http://svn.apache.org/r1658386
Log:
Straming blogpost added

Modified:
    flink/site/blog/feed.xml
    flink/site/blog/index.html
    flink/site/blog/page2/index.html
    flink/site/blog/page3/index.html
    flink/site/news/2012/08/21/release02.html
    flink/site/news/2012/10/15/icde2013.html
    flink/site/news/2012/11/12/btw2013demo.html
    flink/site/news/2012/11/21/previewICDE2013.html
    flink/site/news/2013/03/27/www-demo-paper.html
    flink/site/news/2013/10/21/cikm2013-paper.html
    flink/site/news/2013/12/13/humboldt-innovation-award.html
    flink/site/news/2014/01/10/stratosphere-hadoop-summit.html
    flink/site/news/2014/01/12/0.4-migration-guide.html
    flink/site/news/2014/01/13/stratosphere-release-0.4.html
    flink/site/news/2014/01/26/optimizer_plan_visualization_tool.html
    flink/site/news/2014/01/28/querying_mongodb.html
    flink/site/news/2014/02/18/amazon-elastic-mapreduce-cloud-yarn.html
    flink/site/news/2014/02/24/stratosphere-google-summer-of-code-2014.html
    flink/site/news/2014/04/16/stratosphere-goes-apache-incubator.html
    flink/site/news/2014/05/31/release-0.5.html
    flink/site/news/2014/08/26/release-0.6.html
    flink/site/news/2014/09/26/release-0.6.1.html
    flink/site/news/2014/10/03/upcoming_events.html
    flink/site/news/2014/11/04/release-0.7.0.html
    flink/site/news/2014/11/18/hadoop-compatibility.html
    flink/site/news/2015/01/06/december-in-flink.html
    flink/site/news/2015/01/21/release-0.8.html
    flink/site/news/2015/02/04/january-in-flink.html

Modified: flink/site/blog/feed.xml
URL: 
http://svn.apache.org/viewvc/flink/site/blog/feed.xml?rev=1658386&r1=1658385&r2=1658386&view=diff
==============================================================================
Binary files - no diff available.

Modified: flink/site/blog/index.html
URL: 
http://svn.apache.org/viewvc/flink/site/blog/index.html?rev=1658386&r1=1658385&r2=1658386&view=diff
==============================================================================
--- flink/site/blog/index.html (original)
+++ flink/site/blog/index.html Mon Feb  9 12:34:25 2015
@@ -17,6 +17,7 @@
            <!-- <link 
href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" 
rel="stylesheet"> -->
            <script 
src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js";></script>
            <script src="/js/bootstrap.min.js"></script>
+           <script src="/js/codetabs.js"></script>
     </head>
     <body>
     <div class="af-header-container af-inner-pages-navigation">
@@ -139,6 +140,668 @@
                <div class="col-md-8">
                        
                        <article>
+                               <h2><a 
href="/news/2015/02/09/streaming-example.html">Introducing Flink 
Streaming</a></h2>
+                               <p class="meta">09 Feb 2015</p>
+
+                               <div><p>This post is the first of a series of 
blog posts on Flink Streaming,
+the recent addition to Apache Flink that makes it possible to analyze
+continuous data sources in addition to static files. Flink Streaming
+uses the pipelined Flink engine to process data streams in real time,
+and offers a new API including definition of flexible windows.</p>
+
+<p>In this post, we go through an example that uses the Flink Streaming
+API to compute statistics on stock market data that arrive
+continuously, and combine the stock market data with Twitter streams.
+See the <a 
href="http://flink.apache.org/docs/latest/streaming_guide.html";>Streaming 
Programming
+Guide</a> for a
+detailed presentation of the Streaming API.</p>
+
+<p>First, we read a bunch of stock price streams and combine them into
+one stream of market data. We apply several transformations on this
+market data stream, like rolling aggregations per stock. Then we emit
+price warning alerts when the prices are rapidly changing. Moving 
+towards more advanced features, we compute rolling correlations
+between the market data streams and a Twitter stream with stock mentions.</p>
+
+<p>For running the example implementation please use the <em>0.9-SNAPSHOT</em> 
+version of Flink as a dependency. The full example code base can be 
+found <a 
href="https://github.com/mbalassi/flink/blob/b93015b9af968b80d8ab13be928ab0e447a4fcb5/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala";>here</a>
 in Scala and <a 
href="https://github.com/mbalassi/flink/blob/b93015b9af968b80d8ab13be928ab0e447a4fcb5/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java";>here</a>
 in Java7.</p>
+
+<p><a href="#top"></a></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="reading-from-multiple-inputs">Reading from multiple inputs</h2>
+
+<p>First, let us create the stream of stock prices:</p>
+
+<ol>
+<li>Read a socket stream of stock prices</li>
+<li>Parse the text in the stream to create a stream of <code>StockPrice</code> 
objects</li>
+<li>Add four other sources tagged with the stock symbol.</li>
+<li>Finally, merge the streams to create a unified stream. </li>
+</ol>
+
+<p><img alt="Reading from multiple inputs" 
src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive 
center-block"></p>
+
+<div class="codetabs" markdown="1">
+<div data-lang="scala" markdown="1">
+
+<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">def</span> <span class="n">main</span><span 
class="o">(</span><span class="n">args</span><span class="k">:</span> <span 
class="kt">Array</span><span class="o">[</span><span 
class="kt">String</span><span class="o">])</span> <span class="o">{</span>
+
+  <span class="k">val</span> <span class="n">env</span> <span 
class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="n">getExecutionEnvironment</span>
+
+  <span class="c1">//Read from a socket stream at map it to StockPrice 
objects</span>
+  <span class="k">val</span> <span class="n">socketStockStream</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">).</span><span class="n">map</span><span 
class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span 
class="o">{</span>
+    <span class="k">val</span> <span class="n">split</span> <span 
class="k">=</span> <span class="n">x</span><span class="o">.</span><span 
class="n">split</span><span class="o">(</span><span 
class="s">&quot;,&quot;</span><span class="o">)</span>
+    <span class="nc">StockPrice</span><span class="o">(</span><span 
class="n">split</span><span class="o">(</span><span class="mi">0</span><span 
class="o">),</span> <span class="n">split</span><span class="o">(</span><span 
class="mi">1</span><span class="o">).</span><span 
class="n">toDouble</span><span class="o">)</span>
+  <span class="o">})</span>
+
+  <span class="c1">//Generate other stock streams</span>
+  <span class="k">val</span> <span class="nc">SPX_Stream</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">addSource</span><span class="o">(</span><span 
class="n">generateStock</span><span class="o">(</span><span 
class="s">&quot;SPX&quot;</span><span class="o">)(</span><span 
class="mi">10</span><span class="o">)</span> <span class="k">_</span><span 
class="o">)</span>
+  <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">addSource</span><span class="o">(</span><span 
class="n">generateStock</span><span class="o">(</span><span 
class="s">&quot;FTSE&quot;</span><span class="o">)(</span><span 
class="mi">20</span><span class="o">)</span> <span class="k">_</span><span 
class="o">)</span>
+  <span class="k">val</span> <span class="nc">DJI_Stream</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">addSource</span><span class="o">(</span><span 
class="n">generateStock</span><span class="o">(</span><span 
class="s">&quot;DJI&quot;</span><span class="o">)(</span><span 
class="mi">30</span><span class="o">)</span> <span class="k">_</span><span 
class="o">)</span>
+  <span class="k">val</span> <span class="nc">BUX_Stream</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">addSource</span><span class="o">(</span><span 
class="n">generateStock</span><span class="o">(</span><span 
class="s">&quot;BUX&quot;</span><span class="o">)(</span><span 
class="mi">40</span><span class="o">)</span> <span class="k">_</span><span 
class="o">)</span>
+
+  <span class="c1">//Merge all stock streams together</span>
+  <span class="k">val</span> <span class="n">stockStream</span> <span 
class="k">=</span> <span class="n">socketStockStream</span><span 
class="o">.</span><span class="n">merge</span><span class="o">(</span><span 
class="nc">SPX_Stream</span><span class="o">,</span> <span 
class="nc">FTSE_Stream</span><span class="o">,</span> 
+    <span class="nc">DJI_Stream</span><span class="o">,</span> <span 
class="nc">BUX_Stream</span><span class="o">)</span>
+
+  <span class="n">stockStream</span><span class="o">.</span><span 
class="n">print</span><span class="o">()</span>
+
+  <span class="n">env</span><span class="o">.</span><span 
class="n">execute</span><span class="o">(</span><span class="s">&quot;Stock 
stream&quot;</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+</div>
+
+<div data-lang="java7" markdown="1">
+
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="kd">public</span> <span class="kd">static</span> <span 
class="kt">void</span> <span class="nf">main</span><span 
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
+
+    <span class="kd">final</span> <span 
class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span 
class="o">=</span>
+        <span class="n">StreamExecutionEnvironment</span><span 
class="o">.</span><span class="na">getExecutionEnvironment</span><span 
class="o">();</span>
+
+    <span class="c1">//Read from a socket stream at map it to StockPrice 
objects</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">socketStockStream</span> <span class="o">=</span> <span 
class="n">env</span>
+            <span class="o">.</span><span 
class="na">socketTextStream</span><span class="o">(</span><span 
class="s">&quot;localhost&quot;</span><span class="o">,</span> <span 
class="mi">9999</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">map</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">MapFunction</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">StockPrice</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+                <span class="kd">private</span> <span 
class="n">String</span><span class="o">[]</span> <span 
class="n">tokens</span><span class="o">;</span>
+
+                <span class="nd">@Override</span>
+                <span class="kd">public</span> <span 
class="n">StockPrice</span> <span class="nf">map</span><span 
class="o">(</span><span class="n">String</span> <span 
class="n">value</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
+                    <span class="n">tokens</span> <span class="o">=</span> 
<span class="n">value</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span 
class="s">&quot;,&quot;</span><span class="o">);</span>
+                    <span class="k">return</span> <span class="k">new</span> 
<span class="nf">StockPrice</span><span class="o">(</span><span 
class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span 
class="o">],</span>
+                        <span class="n">Double</span><span 
class="o">.</span><span class="na">parseDouble</span><span 
class="o">(</span><span class="n">tokens</span><span class="o">[</span><span 
class="mi">1</span><span class="o">]));</span>
+                <span class="o">}</span>
+            <span class="o">});</span>
+
+    <span class="c1">//Generate other stock streams</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">SPX_stream</span> <span class="o">=</span> <span 
class="n">env</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">StockSource</span><span class="o">(</span><span 
class="s">&quot;SPX&quot;</span><span class="o">,</span> <span 
class="mi">10</span><span class="o">));</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">FTSE_stream</span> <span class="o">=</span> <span 
class="n">env</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">StockSource</span><span class="o">(</span><span 
class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span 
class="mi">20</span><span class="o">));</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">DJI_stream</span> <span class="o">=</span> <span 
class="n">env</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">StockSource</span><span class="o">(</span><span 
class="s">&quot;DJI&quot;</span><span class="o">,</span> <span 
class="mi">30</span><span class="o">));</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">BUX_stream</span> <span class="o">=</span> <span 
class="n">env</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">StockSource</span><span class="o">(</span><span 
class="s">&quot;BUX&quot;</span><span class="o">,</span> <span 
class="mi">40</span><span class="o">));</span>
+
+    <span class="c1">//Merge all stock streams together</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">stockStream</span> <span class="o">=</span> <span 
class="n">socketStockStream</span>
+        <span class="o">.</span><span class="na">merge</span><span 
class="o">(</span><span class="n">SPX_stream</span><span class="o">,</span> 
<span class="n">FTSE_stream</span><span class="o">,</span> <span 
class="n">DJI_stream</span><span class="o">,</span> <span 
class="n">BUX_stream</span><span class="o">);</span>
+
+    <span class="n">stockStream</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span>
+
+    <span class="n">env</span><span class="o">.</span><span 
class="na">execute</span><span class="o">(</span><span class="s">&quot;Stock 
stream&quot;</span><span class="o">);</span></code></pre></div>
+ 
+</div>
+
+<p></div></p>
+
+<p>See
+<a 
href="http://flink.apache.org/docs/latest/streaming_guide.html#sources";>here</a>
+on how you can create streaming sources for Flink Streaming
+programs. Flink, of course, has support for reading in streams from
+<a 
href="http://flink.apache.org/docs/latest/streaming_guide.html#stream-connectors";>external
+sources</a>
+such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
+of this example, the data streams are simply generated using the
+<code>generateSource</code> method:</p>
+
+<div class="codetabs" markdown="1">
+<div data-lang="scala" markdown="1">
+
+<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">val</span> <span class="n">symbols</span> 
<span class="k">=</span> <span class="nc">List</span><span 
class="o">(</span><span class="s">&quot;SPX&quot;</span><span 
class="o">,</span> <span class="s">&quot;FTSE&quot;</span><span 
class="o">,</span> <span class="s">&quot;DJI&quot;</span><span 
class="o">,</span> <span class="s">&quot;DJT&quot;</span><span 
class="o">,</span> <span class="s">&quot;BUX&quot;</span><span 
class="o">,</span> <span class="s">&quot;DAX&quot;</span><span 
class="o">,</span> <span class="s">&quot;GOOG&quot;</span><span 
class="o">)</span>
+
+<span class="k">case</span> <span class="k">class</span> <span 
class="nc">StockPrice</span><span class="o">(</span><span 
class="n">symbol</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">,</span> <span 
class="n">price</span><span class="k">:</span> <span 
class="kt">Double</span><span class="o">)</span>
+
+<span class="k">def</span> <span class="n">generateStock</span><span 
class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">)(</span><span 
class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span 
class="o">)(</span><span class="n">out</span><span class="k">:</span> <span 
class="kt">Collector</span><span class="o">[</span><span 
class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> 
<span class="o">{</span>
+  <span class="k">var</span> <span class="n">price</span> <span 
class="k">=</span> <span class="mf">1000.</span>
+  <span class="k">while</span> <span class="o">(</span><span 
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">price</span> <span class="k">=</span> <span 
class="n">price</span> <span class="o">+</span> <span 
class="nc">Random</span><span class="o">.</span><span 
class="n">nextGaussian</span> <span class="o">*</span> <span 
class="n">sigma</span>
+    <span class="n">out</span><span class="o">.</span><span 
class="n">collect</span><span class="o">(</span><span 
class="nc">StockPrice</span><span class="o">(</span><span 
class="n">symbol</span><span class="o">,</span> <span 
class="n">price</span><span class="o">))</span>
+    <span class="nc">Thread</span><span class="o">.</span><span 
class="n">sleep</span><span class="o">(</span><span 
class="nc">Random</span><span class="o">.</span><span 
class="n">nextInt</span><span class="o">(</span><span 
class="mi">200</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+</div>
+
+<div data-lang="java7" markdown="1">
+
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="kd">private</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="n">ArrayList</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">SYMBOLS</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">ArrayList</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span>
+    <span class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span><span 
class="s">&quot;SPX&quot;</span><span class="o">,</span> <span 
class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span 
class="s">&quot;DJI&quot;</span><span class="o">,</span> <span 
class="s">&quot;DJT&quot;</span><span class="o">,</span> <span 
class="s">&quot;BUX&quot;</span><span class="o">,</span> <span 
class="s">&quot;DAX&quot;</span><span class="o">,</span> <span 
class="s">&quot;GOOG&quot;</span><span class="o">));</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">StockPrice</span> <span 
class="kd">implements</span> <span class="n">Serializable</span> <span 
class="o">{</span>
+
+    <span class="kd">public</span> <span class="n">String</span> <span 
class="n">symbol</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="n">Double</span> <span 
class="n">price</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">StockPrice</span><span 
class="o">()</span> <span class="o">{</span>
+    <span class="o">}</span>
+
+    <span class="kd">public</span> <span class="nf">StockPrice</span><span 
class="o">(</span><span class="n">String</span> <span 
class="n">symbol</span><span class="o">,</span> <span class="n">Double</span> 
<span class="n">price</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span 
class="na">symbol</span> <span class="o">=</span> <span 
class="n">symbol</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span 
class="na">price</span> <span class="o">=</span> <span 
class="n">price</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
+        <span class="k">return</span> <span 
class="s">&quot;StockPrice{&quot;</span> <span class="o">+</span>
+                <span class="s">&quot;symbol=&#39;&quot;</span> <span 
class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span 
class="sc">&#39;\&#39;&#39;</span> <span class="o">+</span>
+                <span class="s">&quot;, count=&quot;</span> <span 
class="o">+</span> <span class="n">price</span> <span class="o">+</span>
+                <span class="sc">&#39;}&#39;</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">final</span> <span 
class="kd">static</span> <span class="kd">class</span> <span 
class="nc">StockSource</span> <span class="kd">implements</span> <span 
class="n">SourceFunction</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span class="o">{</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">price</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">String</span> <span 
class="n">symbol</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span 
class="n">sigma</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">StockSource</span><span 
class="o">(</span><span class="n">String</span> <span 
class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> 
<span class="n">sigma</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span 
class="na">symbol</span> <span class="o">=</span> <span 
class="n">symbol</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span 
class="na">sigma</span> <span class="o">=</span> <span 
class="n">sigma</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">invoke</span><span class="o">(</span><span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">collector</span><span class="o">)</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+        <span class="n">price</span> <span class="o">=</span> <span 
class="n">DEFAULT_PRICE</span><span class="o">;</span>
+        <span class="n">Random</span> <span class="n">random</span> <span 
class="o">=</span> <span class="k">new</span> <span 
class="nf">Random</span><span class="o">();</span>
+
+        <span class="k">while</span> <span class="o">(</span><span 
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">price</span> <span class="o">=</span> <span 
class="n">price</span> <span class="o">+</span> <span 
class="n">random</span><span class="o">.</span><span 
class="na">nextGaussian</span><span class="o">()</span> <span 
class="o">*</span> <span class="n">sigma</span><span class="o">;</span>
+            <span class="n">collector</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">StockPrice</span><span class="o">(</span><span 
class="n">symbol</span><span class="o">,</span> <span 
class="n">price</span><span class="o">));</span>
+            <span class="n">Thread</span><span class="o">.</span><span 
class="na">sleep</span><span class="o">(</span><span 
class="n">random</span><span class="o">.</span><span 
class="na">nextInt</span><span class="o">(</span><span 
class="mi">200</span><span class="o">));</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+ 
+</div>
+
+<p></div></p>
+
+<p>To read from the text socket stream please make sure that you have a
+socket running. For the sake of the example executing the following
+command in a terminal does the job. You can get
+<a href="http://netcat.sourceforge.net/";>netcat</a> here if it is not available
+on your machine.</p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">nc 
-lk 9999
+</code></pre></div>
+<p>If we execute the program from our IDE we see the system the
+stock prices being generated:</p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">INFO  
  Job execution switched to status RUNNING.
+INFO    Socket Stream(1/1) switched to SCHEDULED 
+INFO    Socket Stream(1/1) switched to DEPLOYING
+INFO    Custom Source(1/1) switched to SCHEDULED 
+INFO    Custom Source(1/1) switched to DEPLOYING
+…
+1&gt; StockPrice{symbol=&#39;SPX&#39;, count=1011.3405732645239}
+2&gt; StockPrice{symbol=&#39;SPX&#39;, count=1018.3381290039248}
+1&gt; StockPrice{symbol=&#39;DJI&#39;, count=1036.7454894073978}
+3&gt; StockPrice{symbol=&#39;DJI&#39;, count=1135.1170217478427}
+3&gt; StockPrice{symbol=&#39;BUX&#39;, count=1053.667523187687}
+4&gt; StockPrice{symbol=&#39;BUX&#39;, count=1036.552601487263}
+</code></pre></div>
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="window-aggregations">Window aggregations</h2>
+
+<p>We first compute aggregations on time-based windows of the
+data. Flink provides <a 
href="http://flink.apache.org/docs/latest/streaming_guide.html#window-operators";>flexible
 windowing semantics</a> where windows can
+also be defined based on count of records or any custom user defined
+logic.</p>
+
+<p>We partition our stream into windows of 10 seconds and slide the
+window every 5 seconds. We compute three statistics every 5 seconds.
+The first is the minimum price of all stocks (note that we set the
+parallelism to 1 to create a global minimum). The second maximum price
+per stock, and the third is the mean stock price (using a reduce
+function). Aggregations and groupings can be
+performed on named fields of POJOs, making the code more readable.</p>
+
+<p><img alt="Basic windowing aggregations" 
src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive 
center-block"></p>
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="scala" markdown="1">
+
+
+<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">//Define the desired time window</span>
+<span class="k">val</span> <span class="n">windowedStream</span> <span 
class="k">=</span> <span class="n">stockStream</span>
+  <span class="o">.</span><span class="n">window</span><span 
class="o">(</span><span class="nc">Time</span><span class="o">.</span><span 
class="n">of</span><span class="o">(</span><span class="mi">10</span><span 
class="o">,</span> <span class="nc">SECONDS</span><span 
class="o">)).</span><span class="n">every</span><span class="o">(</span><span 
class="nc">Time</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">5</span><span class="o">,</span> <span 
class="nc">SECONDS</span><span class="o">))</span>
+
+<span class="c1">//Compute some simple statistics on a rolling window</span>
+<span class="k">val</span> <span class="n">lowest</span> <span 
class="k">=</span> <span class="n">windowedStream</span><span 
class="o">.</span><span class="n">minBy</span><span class="o">(</span><span 
class="s">&quot;price&quot;</span><span class="o">).</span><span 
class="n">setParallelism</span><span class="o">(</span><span 
class="mi">1</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">maxByStock</span> <span 
class="k">=</span> <span class="n">windowedStream</span><span 
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span 
class="s">&quot;symbol&quot;</span><span class="o">).</span><span 
class="n">maxBy</span><span class="o">(</span><span 
class="s">&quot;price&quot;</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">rollingMean</span> <span 
class="k">=</span> <span class="n">windowedStream</span><span 
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span 
class="s">&quot;symbol&quot;</span><span class="o">).</span><span 
class="n">reduceGroup</span><span class="o">(</span><span class="n">mean</span> 
<span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Compute the mean of a window</span>
+<span class="k">def</span> <span class="n">mean</span><span 
class="o">(</span><span class="n">ts</span><span class="k">:</span> <span 
class="kt">Iterable</span><span class="o">[</span><span 
class="kt">StockPrice</span><span class="o">],</span> <span 
class="n">out</span><span class="k">:</span> <span 
class="kt">Collector</span><span class="o">[</span><span 
class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> 
<span class="o">{</span>
+  <span class="k">if</span> <span class="o">(</span><span 
class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span 
class="o">)</span> <span class="o">{</span>
+    <span class="n">out</span><span class="o">.</span><span 
class="n">collect</span><span class="o">(</span><span 
class="nc">StockPrice</span><span class="o">(</span><span 
class="n">ts</span><span class="o">.</span><span class="n">head</span><span 
class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span 
class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span 
class="o">(</span><span class="mi">0</span><span class="k">:</span> <span 
class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span 
class="o">+</span> <span class="k">_</span><span class="o">.</span><span 
class="n">price</span><span class="o">)</span> <span class="o">/</span> <span 
class="n">ts</span><span class="o">.</span><span class="n">size</span><span 
class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<div data-lang="java7" markdown="1">
+
+
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="c1">//Define the desired time window</span>
+<span class="n">WindowedDataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">windowedStream</span> <span class="o">=</span> <span 
class="n">stockStream</span>
+    <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="n">Time</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="mi">10</span><span 
class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">every</span><span 
class="o">(</span><span class="n">Time</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="mi">5</span><span 
class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">));</span>
+
+<span class="c1">//Compute some simple statistics on a rolling window</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">lowest</span> <span class="o">=</span> <span 
class="n">windowedStream</span><span class="o">.</span><span 
class="na">minBy</span><span class="o">(</span><span 
class="s">&quot;price&quot;</span><span class="o">).</span><span 
class="na">setParallelism</span><span class="o">(</span><span 
class="mi">1</span><span class="o">);</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">maxByStock</span> <span class="o">=</span> <span 
class="n">windowedStream</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span><span 
class="s">&quot;symbol&quot;</span><span class="o">).</span><span 
class="na">maxBy</span><span class="o">(</span><span 
class="s">&quot;price&quot;</span><span class="o">);</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">rollingMean</span> <span class="o">=</span> <span 
class="n">windowedStream</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span><span 
class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">reduceGroup</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nf">MeanReduce</span><span class="o">());</span>
+
+<span class="c1">//Compute the mean of a window</span>
+<span class="kd">public</span> <span class="kd">final</span> <span 
class="kd">static</span> <span class="kd">class</span> <span 
class="nc">MeanReduce</span> <span class="kd">implements</span> 
+    <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">,</span> <span 
class="n">StockPrice</span><span class="o">&gt;</span> <span class="o">{</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">sum</span> <span class="o">=</span> <span class="mf">0.0</span><span 
class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span 
class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span 
class="o">;</span>
+    <span class="kd">private</span> <span class="n">String</span> <span 
class="n">symbol</span> <span class="o">=</span> <span 
class="s">&quot;&quot;</span><span class="o">;</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">reduce</span><span class="o">(</span><span 
class="n">Iterable</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">values</span><span class="o">,</span> <span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">out</span><span class="o">)</span> 
+        <span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+
+        <span class="k">if</span> <span class="o">(</span><span 
class="n">values</span><span class="o">.</span><span 
class="na">iterator</span><span class="o">().</span><span 
class="na">hasNext</span><span class="o">())</span> <span 
class="o">{</span><span class="n">s</span>
+            <span class="nf">for</span> <span class="o">(</span><span 
class="n">StockPrice</span> <span class="n">sp</span> <span class="o">:</span> 
<span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+                <span class="n">sum</span> <span class="o">+=</span> <span 
class="n">sp</span><span class="o">.</span><span class="na">price</span><span 
class="o">;</span>
+                <span class="n">symbol</span> <span class="o">=</span> <span 
class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span 
class="o">;</span>
+                <span class="n">count</span><span class="o">++;</span>
+            <span class="o">}</span>
+            <span class="n">out</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">StockPrice</span><span class="o">(</span><span 
class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> 
<span class="o">/</span> <span class="n">count</span><span class="o">));</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<p></div></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="data-driven-windows">Data-driven windows</h2>
+
+<p>The most interesting event in the stream is when the price of a stock
+is changing rapidly. We can send a warning when a stock price changes
+more than 5% since the last warning. To do that, we use a delta-based window 
providing a
+threshold on when the computation will be triggered, a function to
+compute the difference and a default value with which the first record
+is compared. We also create a <code>Count</code> data type to count the 
warnings
+every 30 seconds.</p>
+
+<p><img alt="Data-driven windowing semantics" 
src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive 
center-block"></p>
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="scala" markdown="1">
+
+
+<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="k">case</span> <span class="k">class</span> 
<span class="nc">Count</span><span class="o">(</span><span 
class="n">symbol</span><span class="k">:</span> <span 
class="kt">String</span><span class="o">,</span> <span 
class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span 
class="o">)</span>
+<span class="k">val</span> <span class="n">defaultPrice</span> <span 
class="k">=</span> <span class="nc">StockPrice</span><span 
class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> 
<span class="mi">1000</span><span class="o">)</span>
+
+<span class="c1">//Use delta policy to create price change warnings</span>
+<span class="k">val</span> <span class="n">priceWarnings</span> <span 
class="k">=</span> <span class="n">stockStream</span><span 
class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span 
class="s">&quot;symbol&quot;</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">window</span><span 
class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span 
class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span 
class="o">,</span> <span class="n">priceChange</span><span class="o">,</span> 
<span class="n">defaultPrice</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">reduceGroup</span><span 
class="o">(</span><span class="n">sendWarning</span> <span 
class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Count the number of warnings every half a minute</span>
+<span class="k">val</span> <span class="n">warningsPerStock</span> <span 
class="k">=</span> <span class="n">priceWarnings</span><span 
class="o">.</span><span class="n">map</span><span class="o">(</span><span 
class="nc">Count</span><span class="o">(</span><span class="k">_</span><span 
class="o">,</span> <span class="mi">1</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">groupBy</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">)</span>
+  <span class="o">.</span><span class="n">window</span><span 
class="o">(</span><span class="nc">Time</span><span class="o">.</span><span 
class="n">of</span><span class="o">(</span><span class="mi">30</span><span 
class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">sum</span><span 
class="o">(</span><span class="s">&quot;count&quot;</span><span 
class="o">)</span>
+
+<span class="k">def</span> <span class="n">priceChange</span><span 
class="o">(</span><span class="n">p1</span><span class="k">:</span> <span 
class="kt">StockPrice</span><span class="o">,</span> <span 
class="n">p2</span><span class="k">:</span> <span 
class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span> 
<span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span>
+  <span class="nc">Math</span><span class="o">.</span><span 
class="n">abs</span><span class="o">(</span><span class="n">p1</span><span 
class="o">.</span><span class="n">price</span> <span class="o">/</span> <span 
class="n">p2</span><span class="o">.</span><span class="n">price</span> <span 
class="o">-</span> <span class="mi">1</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="k">def</span> <span class="n">sendWarning</span><span 
class="o">(</span><span class="n">ts</span><span class="k">:</span> <span 
class="kt">Iterable</span><span class="o">[</span><span 
class="kt">StockPrice</span><span class="o">],</span> <span 
class="n">out</span><span class="k">:</span> <span 
class="kt">Collector</span><span class="o">[</span><span 
class="kt">String</span><span class="o">])</span> <span class="k">=</span> 
<span class="o">{</span>
+  <span class="k">if</span> <span class="o">(</span><span 
class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span 
class="o">)</span> <span class="n">out</span><span class="o">.</span><span 
class="n">collect</span><span class="o">(</span><span class="n">ts</span><span 
class="o">.</span><span class="n">head</span><span class="o">.</span><span 
class="n">symbol</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<div data-lang="java7" markdown="1">
+
+
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="kd">private</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="n">Double</span> <span 
class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span 
class="mi">1000</span><span class="o">.;</span>
+<span class="kd">private</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="n">StockPrice</span> <span 
class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nf">StockPrice</span><span 
class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> 
<span class="n">DEFAULT_PRICE</span><span class="o">);</span>
+
+<span class="c1">//Use delta policy to create price change warnings</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">priceWarnings</span> <span class="o">=</span> <span 
class="n">stockStream</span><span class="o">.</span><span 
class="na">groupBy</span><span class="o">(</span><span 
class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="n">Delta</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="mf">0.05</span><span 
class="o">,</span> <span class="k">new</span> <span 
class="n">DeltaFunction</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+        <span class="nd">@Override</span>
+        <span class="kd">public</span> <span class="kt">double</span> <span 
class="nf">getDelta</span><span class="o">(</span><span 
class="n">StockPrice</span> <span class="n">oldDataPoint</span><span 
class="o">,</span> <span class="n">StockPrice</span> <span 
class="n">newDataPoint</span><span class="o">)</span> <span class="o">{</span>
+            <span class="k">return</span> <span class="n">Math</span><span 
class="o">.</span><span class="na">abs</span><span class="o">(</span><span 
class="n">oldDataPoint</span><span class="o">.</span><span 
class="na">price</span> <span class="o">-</span> <span 
class="n">newDataPoint</span><span class="o">.</span><span 
class="na">price</span><span class="o">);</span>
+        <span class="o">}</span>
+    <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span 
class="o">))</span>
+<span class="o">.</span><span class="na">reduceGroup</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nf">SendWarning</span><span class="o">());</span>
+
+<span class="c1">//Count the number of warnings every half a minute</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Count</span><span class="o">&gt;</span> <span 
class="n">warningsPerStock</span> <span class="o">=</span> <span 
class="n">priceWarnings</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="k">new</span> <span 
class="n">MapFunction</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Count</span> <span 
class="nf">map</span><span class="o">(</span><span class="n">String</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span 
class="nf">Count</span><span class="o">(</span><span 
class="n">value</span><span class="o">,</span> <span class="mi">1</span><span 
class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">groupBy</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">).</span><span class="na">window</span><span class="o">(</span><span 
class="n">Time</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">)).</span><span 
class="na">sum</span><span class="o">(</span><span 
class="s">&quot;count&quot;</span><span class="o">);</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">Count</span> <span 
class="kd">implements</span> <span class="n">Serializable</span> <span 
class="o">{</span>
+    <span class="kd">public</span> <span class="n">String</span> <span 
class="n">symbol</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="n">Integer</span> <span 
class="n">count</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">Count</span><span 
class="o">()</span> <span class="o">{</span>
+    <span class="o">}</span>
+
+    <span class="kd">public</span> <span class="nf">Count</span><span 
class="o">(</span><span class="n">String</span> <span 
class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> 
<span class="n">count</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span 
class="na">symbol</span> <span class="o">=</span> <span 
class="n">symbol</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span 
class="na">count</span> <span class="o">=</span> <span 
class="n">count</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
+        <span class="k">return</span> <span 
class="s">&quot;Count{&quot;</span> <span class="o">+</span>
+                <span class="s">&quot;symbol=&#39;&quot;</span> <span 
class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span 
class="sc">&#39;\&#39;&#39;</span> <span class="o">+</span>
+                <span class="s">&quot;, count=&quot;</span> <span 
class="o">+</span> <span class="n">count</span> <span class="o">+</span>
+                <span class="sc">&#39;}&#39;</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="kd">class</span> <span 
class="nc">SendWarning</span> <span class="kd">implements</span> <span 
class="n">GroupReduceFunction</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">reduce</span><span class="o">(</span><span 
class="n">Iterable</span><span class="o">&lt;</span><span 
class="n">StockPrice</span><span class="o">&gt;</span> <span 
class="n">values</span><span class="o">,</span> <span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">out</span><span class="o">)</span> 
+        <span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+
+        <span class="k">if</span> <span class="o">(</span><span 
class="n">values</span><span class="o">.</span><span 
class="na">iterator</span><span class="o">().</span><span 
class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+            <span class="n">out</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span 
class="n">values</span><span class="o">.</span><span 
class="na">iterator</span><span class="o">().</span><span 
class="na">next</span><span class="o">().</span><span 
class="na">symbol</span><span class="o">);</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<p></div></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="combining-with-a-twitter-stream">Combining with a Twitter stream</h2>
+
+<p>Next, we will read a Twitter stream and correlate it with our stock
+price stream. Flink has support for connecting to <a 
href="http://flink.apache.org/docs/latest/streaming_guide.html#twitter-streaming-api";>Twitter&#39;s
+API</a>,
+but for the sake of this example we generate dummy tweet data.</p>
+
+<p><img alt="Social media analytics" src="/img/blog/blog_social_media.png" 
width="100%" class="img-responsive center-block"></p>
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="scala" markdown="1">
+
+
+
+<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">//Read a stream of tweets</span>
+<span class="k">val</span> <span class="n">tweetStream</span> <span 
class="k">=</span> <span class="n">env</span><span class="o">.</span><span 
class="n">addSource</span><span class="o">(</span><span 
class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Extract the stock symbols</span>
+<span class="k">val</span> <span class="n">mentionedSymbols</span> <span 
class="k">=</span> <span class="n">tweetStream</span><span 
class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span 
class="n">tweet</span> <span class="k">=&gt;</span> <span 
class="n">tweet</span><span class="o">.</span><span class="n">split</span><span 
class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">map</span><span 
class="o">(</span><span class="k">_</span><span class="o">.</span><span 
class="n">toUpperCase</span><span class="o">())</span>
+  <span class="o">.</span><span class="n">filter</span><span 
class="o">(</span><span class="n">symbols</span><span class="o">.</span><span 
class="n">contains</span><span class="o">(</span><span class="k">_</span><span 
class="o">))</span>
+
+<span class="c1">//Count the extracted symbols</span>
+<span class="k">val</span> <span class="n">tweetsPerStock</span> <span 
class="k">=</span> <span class="n">mentionedSymbols</span><span 
class="o">.</span><span class="n">map</span><span class="o">(</span><span 
class="nc">Count</span><span class="o">(</span><span class="k">_</span><span 
class="o">,</span> <span class="mi">1</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">groupBy</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">)</span>
+  <span class="o">.</span><span class="n">window</span><span 
class="o">(</span><span class="nc">Time</span><span class="o">.</span><span 
class="n">of</span><span class="o">(</span><span class="mi">30</span><span 
class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">sum</span><span 
class="o">(</span><span class="s">&quot;count&quot;</span><span 
class="o">)</span>
+
+<span class="k">def</span> <span class="n">generateTweets</span><span 
class="o">(</span><span class="n">out</span><span class="k">:</span> <span 
class="kt">Collector</span><span class="o">[</span><span 
class="kt">String</span><span class="o">])</span> <span class="k">=</span> 
<span class="o">{</span>
+  <span class="k">while</span> <span class="o">(</span><span 
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">val</span> <span class="n">s</span> <span 
class="k">=</span> <span class="k">for</span> <span class="o">(</span><span 
class="n">i</span> <span class="k">&lt;-</span> <span class="mi">1</span> <span 
class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span 
class="k">yield</span> <span class="o">(</span><span 
class="n">symbols</span><span class="o">(</span><span 
class="nc">Random</span><span class="o">.</span><span 
class="n">nextInt</span><span class="o">(</span><span 
class="n">symbols</span><span class="o">.</span><span 
class="n">size</span><span class="o">)))</span>
+    <span class="n">out</span><span class="o">.</span><span 
class="n">collect</span><span class="o">(</span><span class="n">s</span><span 
class="o">.</span><span class="n">mkString</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">))</span>
+    <span class="nc">Thread</span><span class="o">.</span><span 
class="n">sleep</span><span class="o">(</span><span 
class="nc">Random</span><span class="o">.</span><span 
class="n">nextInt</span><span class="o">(</span><span 
class="mi">500</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<div data-lang="java7" markdown="1">
+
+
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="c1">//Read a stream of tweets</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">tweetStream</span> <span class="o">=</span> <span 
class="n">env</span><span class="o">.</span><span 
class="na">addSource</span><span class="o">(</span><span class="k">new</span> 
<span class="nf">TweetSource</span><span class="o">());</span>
+
+<span class="c1">//Extract the stock symbols</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">mentionedSymbols</span> <span class="o">=</span> <span 
class="n">tweetStream</span><span class="o">.</span><span 
class="na">flatMap</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">FlatMapFunction</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> 
<span class="n">value</span><span class="o">,</span> <span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">out</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">Exception</span> <span class="o">{</span>
+        <span class="n">String</span><span class="o">[]</span> <span 
class="n">words</span> <span class="o">=</span> <span 
class="n">value</span><span class="o">.</span><span 
class="na">split</span><span class="o">(</span><span class="s">&quot; 
&quot;</span><span class="o">);</span>
+        <span class="k">for</span> <span class="o">(</span><span 
class="n">String</span> <span class="n">word</span> <span class="o">:</span> 
<span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">out</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span 
class="n">word</span><span class="o">.</span><span 
class="na">toUpperCase</span><span class="o">());</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">filter</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">FilterFunction</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">boolean</span> <span 
class="nf">filter</span><span class="o">(</span><span class="n">String</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+        <span class="k">return</span> <span class="n">SYMBOLS</span><span 
class="o">.</span><span class="na">contains</span><span class="o">(</span><span 
class="n">value</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+
+<span class="c1">//Count the extracted symbols</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Count</span><span class="o">&gt;</span> <span 
class="n">tweetsPerStock</span> <span class="o">=</span> <span 
class="n">mentionedSymbols</span><span class="o">.</span><span 
class="na">map</span><span class="o">(</span><span class="k">new</span> <span 
class="n">MapFunction</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Count</span> <span 
class="nf">map</span><span class="o">(</span><span class="n">String</span> 
<span class="n">value</span><span class="o">)</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span 
class="nf">Count</span><span class="o">(</span><span 
class="n">value</span><span class="o">,</span> <span class="mi">1</span><span 
class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">groupBy</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">).</span><span class="na">window</span><span class="o">(</span><span 
class="n">Time</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">)).</span><span 
class="na">sum</span><span class="o">(</span><span 
class="s">&quot;count&quot;</span><span class="o">);</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="kd">class</span> <span 
class="nc">TweetSource</span> <span class="kd">implements</span> <span 
class="n">SourceFunction</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="n">Random</span> <span class="n">random</span><span 
class="o">;</span>
+    <span class="n">StringBuilder</span> <span 
class="n">stringBuilder</span><span class="o">;</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">invoke</span><span class="o">(</span><span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">collector</span><span class="o">)</span> <span 
class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+        <span class="n">random</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nf">Random</span><span class="o">();</span>
+        <span class="n">stringBuilder</span> <span class="o">=</span> <span 
class="k">new</span> <span class="nf">StringBuilder</span><span 
class="o">();</span>
+
+        <span class="k">while</span> <span class="o">(</span><span 
class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">stringBuilder</span><span class="o">.</span><span 
class="na">setLength</span><span class="o">(</span><span 
class="mi">0</span><span class="o">);</span>
+            <span class="k">for</span> <span class="o">(</span><span 
class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span 
class="o">&lt;</span> <span class="mi">3</span><span class="o">;</span> <span 
class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+                <span class="n">stringBuilder</span><span 
class="o">.</span><span class="na">append</span><span class="o">(</span><span 
class="s">&quot; &quot;</span><span class="o">);</span>
+                <span class="n">stringBuilder</span><span 
class="o">.</span><span class="na">append</span><span class="o">(</span><span 
class="n">SYMBOLS</span><span class="o">.</span><span 
class="na">get</span><span class="o">(</span><span class="n">random</span><span 
class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span 
class="n">SYMBOLS</span><span class="o">.</span><span 
class="na">size</span><span class="o">())));</span>
+            <span class="o">}</span>
+            <span class="n">collector</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span 
class="n">stringBuilder</span><span class="o">.</span><span 
class="na">toString</span><span class="o">());</span>
+            <span class="n">Thread</span><span class="o">.</span><span 
class="na">sleep</span><span class="o">(</span><span class="mi">500</span><span 
class="o">);</span>
+        <span class="o">}</span>
+
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<p></div></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="streaming-joins">Streaming joins</h2>
+
+<p>Finally, we join real-time tweets and stock prices and compute a
+rolling correlation between the number of price warnings and the
+number of mentions of a given stock in the Twitter stream. As both of
+these data streams are potentially infinite, we apply the join on a
+30-second window.</p>
+
+<p><img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" 
class="img-responsive center-block"> </p>
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="scala" markdown="1">
+
+
+
+<div class="highlight"><pre><code class="language-scala" 
data-lang="scala"><span class="c1">//Join warnings and parsed tweets</span>
+<span class="k">val</span> <span class="n">tweetsAndWarning</span> <span 
class="k">=</span> <span class="n">warningsPerStock</span><span 
class="o">.</span><span class="n">join</span><span class="o">(</span><span 
class="n">tweetsPerStock</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">onWindow</span><span 
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span 
class="nc">SECONDS</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">where</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">)</span>
+  <span class="o">.</span><span class="n">equalTo</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">)</span> <span class="o">{</span> <span class="o">(</span><span 
class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span 
class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span 
class="n">c1</span><span class="o">.</span><span class="n">count</span><span 
class="o">,</span> <span class="n">c2</span><span class="o">.</span><span 
class="n">count</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">rollingCorrelation</span> <span 
class="k">=</span> <span class="n">tweetsAndWarning</span><span 
class="o">.</span><span class="n">window</span><span class="o">(</span><span 
class="nc">Time</span><span class="o">.</span><span class="n">of</span><span 
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span 
class="nc">SECONDS</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">reduceGroup</span><span 
class="o">(</span><span class="n">computeCorrelation</span> <span 
class="k">_</span><span class="o">).</span><span 
class="n">setParallelism</span><span class="o">(</span><span 
class="mi">1</span><span class="o">)</span>
+
+<span class="n">rollingCorrelation</span> <span class="n">print</span>
+
+<span class="c1">//Compute rolling correlation</span>
+<span class="k">def</span> <span class="n">computeCorrelation</span><span 
class="o">(</span><span class="n">input</span><span class="k">:</span> <span 
class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>, 
<span class="kt">Int</span><span class="o">)],</span> <span 
class="n">out</span><span class="k">:</span> <span 
class="kt">Collector</span><span class="o">[</span><span 
class="kt">Double</span><span class="o">])</span> <span class="k">=</span> 
<span class="o">{</span>
+  <span class="k">if</span> <span class="o">(</span><span 
class="n">input</span><span class="o">.</span><span 
class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">val</span> <span class="n">var1</span> <span 
class="k">=</span> <span class="n">input</span><span class="o">.</span><span 
class="n">map</span><span class="o">(</span><span class="k">_</span><span 
class="o">.</span><span class="n">_1</span><span class="o">)</span>
+    <span class="k">val</span> <span class="n">mean1</span> <span 
class="k">=</span> <span class="n">average</span><span class="o">(</span><span 
class="n">var1</span><span class="o">)</span>
+    <span class="k">val</span> <span class="n">var2</span> <span 
class="k">=</span> <span class="n">input</span><span class="o">.</span><span 
class="n">map</span><span class="o">(</span><span class="k">_</span><span 
class="o">.</span><span class="n">_2</span><span class="o">)</span>
+    <span class="k">val</span> <span class="n">mean2</span> <span 
class="k">=</span> <span class="n">average</span><span class="o">(</span><span 
class="n">var2</span><span class="o">)</span>
+
+    <span class="k">val</span> <span class="n">cov</span> <span 
class="k">=</span> <span class="n">average</span><span class="o">(</span><span 
class="n">var1</span><span class="o">.</span><span class="n">zip</span><span 
class="o">(</span><span class="n">var2</span><span class="o">).</span><span 
class="n">map</span><span class="o">(</span><span class="n">xy</span> <span 
class="k">=&gt;</span> <span class="o">(</span><span class="n">xy</span><span 
class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span 
class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span 
class="o">(</span><span class="n">xy</span><span class="o">.</span><span 
class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span 
class="o">)))</span>
+    <span class="k">val</span> <span class="n">d1</span> <span 
class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span 
class="n">sqrt</span><span class="o">(</span><span 
class="n">average</span><span class="o">(</span><span 
class="n">var1</span><span class="o">.</span><span class="n">map</span><span 
class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span 
class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span 
class="o">((</span><span class="n">x</span> <span class="o">-</span> <span 
class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span 
class="o">))))</span>
+    <span class="k">val</span> <span class="n">d2</span> <span 
class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span 
class="n">sqrt</span><span class="o">(</span><span 
class="n">average</span><span class="o">(</span><span 
class="n">var2</span><span class="o">.</span><span class="n">map</span><span 
class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span 
class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span 
class="o">((</span><span class="n">x</span> <span class="o">-</span> <span 
class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span 
class="o">))))</span>
+
+    <span class="n">out</span><span class="o">.</span><span 
class="n">collect</span><span class="o">(</span><span class="n">cov</span> 
<span class="o">/</span> <span class="o">(</span><span class="n">d1</span> 
<span class="o">*</span> <span class="n">d2</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<div data-lang="java7" markdown="1">
+
+
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span 
class="c1">//Join warnings and parsed tweets</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">tweetsAndWarning</span> <span class="o">=</span> <span 
class="n">warningsPerStock</span>
+    <span class="o">.</span><span class="na">join</span><span 
class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">onWindow</span><span 
class="o">(</span><span class="mi">30</span><span class="o">,</span> <span 
class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">where</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">)</span>
+    <span class="o">.</span><span class="na">equalTo</span><span 
class="o">(</span><span class="s">&quot;symbol&quot;</span><span 
class="o">)</span>
+    <span class="o">.</span><span class="na">with</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">JoinFunction</span><span class="o">&lt;</span><span 
class="n">Count</span><span class="o">,</span> <span 
class="n">Count</span><span class="o">,</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;()</span> <span 
class="o">{</span>
+        <span class="nd">@Override</span>
+        <span class="kd">public</span> <span class="n">Tuple2</span><span 
class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> 
<span class="n">Integer</span><span class="o">&gt;</span> <span 
class="nf">join</span><span class="o">(</span><span class="n">Count</span> 
<span class="n">first</span><span class="o">,</span> <span 
class="n">Count</span> <span class="n">second</span><span class="o">)</span> 
<span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+            <span class="k">return</span> <span class="k">new</span> <span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;(</span><span 
class="n">first</span><span class="o">.</span><span 
class="na">count</span><span class="o">,</span> <span 
class="n">second</span><span class="o">.</span><span 
class="na">count</span><span class="o">);</span>
+            <span class="o">}</span>
+    <span class="o">});</span>
+
+<span class="c1">//Compute rolling correlation</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span 
class="n">Double</span><span class="o">&gt;</span> <span 
class="n">rollingCorrelation</span> <span class="o">=</span> <span 
class="n">tweetsAndWarning</span>
+    <span class="o">.</span><span class="na">window</span><span 
class="o">(</span><span class="n">Time</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="mi">30</span><span 
class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span 
class="na">SECONDS</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">reduceGroup</span><span 
class="o">(</span><span class="k">new</span> <span 
class="nf">CorrelationReduce</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">setParallelism</span><span 
class="o">(</span><span class="mi">1</span><span class="o">);</span>
+
+<span class="n">rollingCorrelation</span><span class="o">.</span><span 
class="na">print</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="kd">class</span> <span 
class="nc">CorrelationReduce</span>
+    <span class="kd">implements</span> <span 
class="n">GroupReduceFunction</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;,</span> <span 
class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
+
+    <span class="kd">private</span> <span class="n">Integer</span> <span 
class="n">leftSum</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span 
class="n">rightSum</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span 
class="n">count</span><span class="o">;</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">leftMean</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">rightMean</span><span class="o">;</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">cov</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">leftSd</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Double</span> <span 
class="n">rightSd</span><span class="o">;</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">reduce</span><span class="o">(</span><span 
class="n">Iterable</span><span class="o">&lt;</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;&gt;</span> <span 
class="n">values</span><span class="o">,</span> <span 
class="n">Collector</span><span class="o">&lt;</span><span 
class="n">Double</span><span class="o">&gt;</span> <span 
class="n">out</span><span class="o">)</span> 
+        <span class="kd">throws</span> <span class="n">Exception</span> <span 
class="o">{</span>
+
+        <span class="n">leftSum</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">;</span>
+        <span class="n">rightSum</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">;</span>
+        <span class="n">count</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">;</span>
+
+        <span class="n">cov</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">.;</span>
+        <span class="n">leftSd</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">.;</span>
+        <span class="n">rightSd</span> <span class="o">=</span> <span 
class="mi">0</span><span class="o">.;</span>
+
+        <span class="c1">//compute mean for both sides, save count</span>
+        <span class="k">for</span> <span class="o">(</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> 
<span class="o">:</span> <span class="n">values</span><span class="o">)</span> 
<span class="o">{</span>
+            <span class="n">leftSum</span> <span class="o">+=</span> <span 
class="n">pair</span><span class="o">.</span><span class="na">f0</span><span 
class="o">;</span>
+            <span class="n">rightSum</span> <span class="o">+=</span> <span 
class="n">pair</span><span class="o">.</span><span class="na">f1</span><span 
class="o">;</span>
+            <span class="n">count</span><span class="o">++;</span>
+        <span class="o">}</span>
+
+        <span class="n">leftMean</span> <span class="o">=</span> <span 
class="n">leftSum</span><span class="o">.</span><span 
class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> 
<span class="n">count</span><span class="o">;</span>
+        <span class="n">rightMean</span> <span class="o">=</span> <span 
class="n">rightSum</span><span class="o">.</span><span 
class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> 
<span class="n">count</span><span class="o">;</span>
+
+        <span class="c1">//compute covariance &amp; std. deviations</span>
+        <span class="k">for</span> <span class="o">(</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> 
<span class="o">:</span> <span class="n">values</span><span class="o">)</span> 
<span class="o">{</span>
+            <span class="n">cov</span> <span class="o">+=</span> <span 
class="o">(</span><span class="n">pair</span><span class="o">.</span><span 
class="na">f0</span> <span class="o">-</span> <span 
class="n">leftMean</span><span class="o">)</span> <span class="o">*</span> 
<span class="o">(</span><span class="n">pair</span><span 
class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span 
class="n">rightMean</span><span class="o">)</span> <span class="o">/</span> 
<span class="n">count</span><span class="o">;</span>
+        <span class="o">}</span>
+
+        <span class="k">for</span> <span class="o">(</span><span 
class="n">Tuple2</span><span class="o">&lt;</span><span 
class="n">Integer</span><span class="o">,</span> <span 
class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> 
<span class="o">:</span> <span class="n">values</span><span class="o">)</span> 
<span class="o">{</span>
+            <span class="n">leftSd</span> <span class="o">+=</span> <span 
class="n">Math</span><span class="o">.</span><span class="na">pow</span><span 
class="o">(</span><span class="n">pair</span><span class="o">.</span><span 
class="na">f0</span> <span class="o">-</span> <span 
class="n">leftMean</span><span class="o">,</span> <span 
class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span 
class="n">count</span><span class="o">;</span>
+            <span class="n">rightSd</span> <span class="o">+=</span> <span 
class="n">Math</span><span class="o">.</span><span class="na">pow</span><span 
class="o">(</span><span class="n">pair</span><span class="o">.</span><span 
class="na">f1</span> <span class="o">-</span> <span 
class="n">rightMean</span><span class="o">,</span> <span 
class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span 
class="n">count</span><span class="o">;</span>
+        <span class="o">}</span>
+        <span class="n">leftSd</span> <span class="o">=</span> <span 
class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span 
class="o">(</span><span class="n">leftSd</span><span class="o">);</span>
+        <span class="n">rightSd</span> <span class="o">=</span> <span 
class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span 
class="o">(</span><span class="n">rightSd</span><span class="o">);</span>
+
+        <span class="n">out</span><span class="o">.</span><span 
class="na">collect</span><span class="o">(</span><span class="n">cov</span> 
<span class="o">/</span> <span class="o">(</span><span class="n">leftSd</span> 
<span class="o">*</span> <span class="n">rightSd</span><span 
class="o">));</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+
+</div>
+
+<p></div></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="other-things-to-try">Other things to try</h2>
+
+<p>For a full feature overview please check the <a 
href="http://flink.apache.org/docs/latest/streaming_guide.html";>Streaming 
Guide</a>, which describes all the available API features.
+You are very welcome to try out our features for different use-cases we are 
looking forward to your experiences. Feel free to <a 
href="http://flink.apache.org/community.html#mailing-lists";>contact us</a>.</p>
+
+<h2 id="upcoming-for-streaming">Upcoming for streaming</h2>
+
+<p>There are several aspects of Flink Streaming that are subjects to
+change by the next release making this application look even nicer.</p>
+
+<p>We are currently working on an update to our windowing semantics
+enabling the users to apply multiple transformations on the same
+window.</p>
+
+<p>Finally, stay tuned for later blog posts on how Flink Streaming works
+internally, fault tolerance, and performance measurements!</p>
+
+<p><a href="#top">Back to top</a></p>
+</div>
+                               <a 
href="/news/2015/02/09/streaming-example.html#disqus_thread">Introducing Flink 
Streaming</a>
+                       </article>
+                       
+                       <article>
                                <h2><a 
href="/news/2015/02/04/january-in-flink.html">January 2015 in the Flink 
community</a></h2>
                                <p class="meta">04 Feb 2015</p>
 
@@ -757,17 +1420,6 @@ robust, as well as breaking API changes.
                                <a 
href="/news/2014/05/31/release-0.5.html#disqus_thread">Stratosphere version 0.5 
available</a>
                        </article>
                        
-                       <article>
-                               <h2><a 
href="/news/2014/04/16/stratosphere-goes-apache-incubator.html">Stratosphere 
accepted as Apache Incubator Project</a></h2>
-                               <p class="meta">16 Apr 2014</p>
-
-                               <div><p>We are happy to announce that 
Stratosphere has been accepted as a project for the <a 
href="https://incubator.apache.org/";>Apache Incubator</a>. The <a 
href="https://wiki.apache.org/incubator/StratosphereProposal";>proposal</a> has 
been accepted by the Incubator PMC members earlier this week. The Apache 
Incubator is the first step in the process of giving a project to the <a 
href="http://apache.org";>Apache Software Foundation</a>. While under 
incubation, the project will move to the Apache infrastructure and adopt the 
community-driven development principles of the Apache Foundation. Projects can 
graduate from incubation to become top-level projects if they show activity, a 
healthy community dynamic, and releases.</p>
-
-<p>We are glad to have Alan Gates as champion on board, as well as a set of 
great mentors, including Sean Owen, Ted Dunning, Owen O&#39;Malley, Henry 
Saputra, and Ashutosh Chauhan. We are confident that we will make this a great 
open source effort.</p>
-</div>
-                               <a 
href="/news/2014/04/16/stratosphere-goes-apache-incubator.html#disqus_thread">Stratosphere
 accepted as Apache Incubator Project</a>
-                       </article>
-                       
                </div>
                <div class="col-md-2"></div>
        </div>

Modified: flink/site/blog/page2/index.html
URL: 
http://svn.apache.org/viewvc/flink/site/blog/page2/index.html?rev=1658386&r1=1658385&r2=1658386&view=diff
==============================================================================
--- flink/site/blog/page2/index.html (original)
+++ flink/site/blog/page2/index.html Mon Feb  9 12:34:25 2015
@@ -17,6 +17,7 @@
            <!-- <link 
href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" 
rel="stylesheet"> -->
            <script 
src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js";></script>
            <script src="/js/bootstrap.min.js"></script>
+           <script src="/js/codetabs.js"></script>
     </head>
     <body>
     <div class="af-header-container af-inner-pages-navigation">
@@ -139,6 +140,17 @@
                <div class="col-md-8">
                        
                        <article>
+                               <h2><a 
href="/news/2014/04/16/stratosphere-goes-apache-incubator.html">Stratosphere 
accepted as Apache Incubator Project</a></h2>
+                               <p class="meta">16 Apr 2014</p>
+
+                               <div><p>We are happy to announce that 
Stratosphere has been accepted as a project for the <a 
href="https://incubator.apache.org/";>Apache Incubator</a>. The <a 
href="https://wiki.apache.org/incubator/StratosphereProposal";>proposal</a> has 
been accepted by the Incubator PMC members earlier this week. The Apache 
Incubator is the first step in the process of giving a project to the <a 
href="http://apache.org";>Apache Software Foundation</a>. While under 
incubation, the project will move to the Apache infrastructure and adopt the 
community-driven development principles of the Apache Foundation. Projects can 
graduate from incubation to become top-level projects if they show activity, a 
healthy community dynamic, and releases.</p>
+
+<p>We are glad to have Alan Gates as champion on board, as well as a set of 
great mentors, including Sean Owen, Ted Dunning, Owen O&#39;Malley, Henry 
Saputra, and Ashutosh Chauhan. We are confident that we will make this a great 
open source effort.</p>
+</div>
+                               <a 
href="/news/2014/04/16/stratosphere-goes-apache-incubator.html#disqus_thread">Stratosphere
 accepted as Apache Incubator Project</a>
+                       </article>
+                       
+                       <article>
                                <h2><a 
href="/news/2014/02/24/stratosphere-google-summer-of-code-2014.html">Stratosphere
 got accepted for Google Summer of Code 2014</a></h2>
                                <p class="meta">24 Feb 2014</p>
 
@@ -721,21 +733,6 @@ in the majority of cases.</p>
                                <a 
href="/news/2013/10/21/cikm2013-paper.html#disqus_thread">Paper "“All Roads 
Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing" 
accepted at CIKM 2013</a>
                        </article>
                        
-                       <article>
-                               <h2><a 
href="/news/2013/03/27/www-demo-paper.html">Demo Paper "Large-Scale 
Social-Media Analytics on Stratosphere" Accepted at WWW 2013</a></h2>
-                               <p class="meta">27 Mar 2013</p>
-
-                               <div>   <p>Our demo submission<br />
-<strong><cite>"Large-Scale Social-Media Analytics on 
Stratosphere"</cite></strong><br />
-by Christoph Boden, Marcel Karnstedt, Miriam Fernandez and Volker Markl<br />
-has been accepted for WWW 2013 in Rio de Janeiro, Brazil.</p>
-<p>Visit our demo, and talk to us if you are attending WWW 2013.</p>
-<p><strong>Abstract:</strong><br />
-The importance of social-media platforms and online communities - in business 
as well as public context - is more and more acknowledged and appreciated by 
industry and researchers alike. Consequently, a wide range of analytics has 
been proposed to understand, steer, and exploit the mechanics and laws driving 
their functionality and creating the resulting benefits. However, analysts 
usually face significant problems in scaling existing and novel approaches to 
match the data volume and size of modern online communities. In this work, we 
propose and demonstrate the usage of the massively parallel data prossesing 
system Stratosphere, based on second order functions as an extended notion of 
the MapReduce paradigm, to provide a new level of scalability to such 
social-media analytics. Based on the popular example of role analysis, we 
present and illustrate how this massively parallel approach can be leveraged to 
scale out complex data-mining tasks, while providing a programming approach th
 at eases the formulation of complete analytical workflows.</p> 
-</div>
-                               <a 
href="/news/2013/03/27/www-demo-paper.html#disqus_thread">Demo Paper 
"Large-Scale Social-Media Analytics on Stratosphere" Accepted at WWW 2013</a>
-                       </article>
-                       
                </div>
                <div class="col-md-2"></div>
        </div>


Reply via email to