http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e63786d/content/blog/feed.xml
----------------------------------------------------------------------
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index 3b2b4a1..17afaf1 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,153 @@
 <atom:link href="http://flink.apache.org/blog/feed.xml"; rel="self" 
type="application/rss+xml" />
 
 <item>
+<title>Storm Compatibility in Apache Flink: How to run existing Storm 
topologies on Flink</title>
+<description>&lt;p&gt;&lt;a 
href=&quot;https://storm.apache.org&quot;&gt;Apache Storm&lt;/a&gt; was one of 
the first distributed and scalable stream processing systems available in the 
open source space offering (near) real-time tuple-by-tuple processing semantics.
+Initially released by the developers at Backtype in 2011 under the Eclipse 
open-source license, it became popular very quickly.
+Only shortly afterwards, Twitter acquired Backtype.
+Since then, Storm has been growing in popularity, is used in production at 
many big companies, and is the de-facto industry standard for big data stream 
processing.
+In 2013, Storm entered the Apache incubator program, followed by its 
graduation to top-level in 2014.&lt;/p&gt;
+
+&lt;p&gt;Apache Flink is a stream processing engine that improves upon older 
technologies like Storm in several dimensions,
+including &lt;a 
href=&quot;https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html&quot;&gt;strong
 consistency guarantees&lt;/a&gt; (“exactly once”),
+a higher level &lt;a 
href=&quot;https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html&quot;&gt;DataStream
 API&lt;/a&gt;,
+support for &lt;a 
href=&quot;http://flink.apache.org/news/2015/12/04/Introducing-windows.html&quot;&gt;event
 time and a rich windowing system&lt;/a&gt;,
+as well as &lt;a 
href=&quot;https://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/&quot;&gt;superior
 throughput with competitive low latency&lt;/a&gt;.&lt;/p&gt;
+
+&lt;p&gt;While Flink offers several technical benefits over Storm, an existing 
investment on a codebase of applications developed for Storm often makes it 
difficult to switch engines.
+For these reasons, as part of the Flink 0.10 release, Flink ships with a Storm 
compatibility package that allows users to:&lt;/p&gt;
+
+&lt;ul&gt;
+  &lt;li&gt;Run &lt;strong&gt;unmodified&lt;/strong&gt; Storm topologies using 
Apache Flink benefiting from superior performance.&lt;/li&gt;
+  &lt;li&gt;&lt;strong&gt;Embed&lt;/strong&gt; Storm code (spouts and bolts) 
as operators inside Flink DataStream programs.&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;Only minor code changes are required in order to submit the program 
to Flink instead of Storm.
+This minimizes the work for developers to run existing Storm topologies while 
leveraging Apache Flink’s fast and robust execution engine.&lt;/p&gt;
+
+&lt;p&gt;We note that the Storm compatibility package is continuously 
improving and does not cover the full spectrum of Storm’s API.
+However, it is powerful enough to cover many use cases.&lt;/p&gt;
+
+&lt;h2 id=&quot;executing-storm-topologies-with-flink&quot;&gt;Executing Storm 
topologies with Flink&lt;/h2&gt;
+
+&lt;center&gt;
+&lt;img src=&quot;/img/blog/flink-storm.png&quot; 
style=&quot;height:200px;margin:15px&quot; /&gt;
+&lt;/center&gt;
+
+&lt;p&gt;The easiest way to use the Storm compatibility package is by 
executing a whole Storm topology in Flink.
+For this, you only need to replace the dependency 
&lt;code&gt;storm-core&lt;/code&gt; by &lt;code&gt;flink-storm&lt;/code&gt; in 
your Storm project and &lt;strong&gt;change two lines of code&lt;/strong&gt; in 
your original Storm program.&lt;/p&gt;
+
+&lt;p&gt;The following example shows a simple Storm-Word-Count-Program that 
can be executed in Flink.
+First, the program is assembled the Storm way without any code change to 
Spouts, Bolts, or the topology itself.&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// assemble 
topology, the Storm way&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;TopologyBuilder&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;builder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TopologyBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;source&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormFileSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;tokenizer&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormBoltTokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt;
+       &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;shuffleGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;source&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormBoltCounter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt;
+       &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;fieldsGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;tokenizer&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Fields&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;word&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setBolt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;sink&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StormBoltFileSink&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+       &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;shuffleGrouping&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;In order to execute the topology, we need to translate it to a 
&lt;code&gt;FlinkTopology&lt;/code&gt; and submit it to a local or remote Flink 
cluster, very similar to submitting the application to a Storm 
cluster.&lt;sup&gt;&lt;a href=&quot;#fn1&quot; 
id=&quot;ref1&quot;&gt;1&lt;/a&gt;&lt;/sup&gt;&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// 
transform Storm topology to Flink program&lt;/span&gt;
+&lt;span class=&quot;c1&quot;&gt;// replaces: StormTopology topology = 
builder.createTopology();&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;FlinkTopology&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topology&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FlinkTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;createTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+&lt;span class=&quot;n&quot;&gt;Config&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;conf&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Config&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+&lt;span class=&quot;k&quot;&gt;if&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;runLocal&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+       &lt;span class=&quot;c1&quot;&gt;// use FlinkLocalCluster instead of 
LocalCluster&lt;/span&gt;
+       &lt;span class=&quot;n&quot;&gt;FlinkLocalCluster&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;cluster&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FlinkLocalCluster&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getLocalCluster&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+       &lt;span class=&quot;n&quot;&gt;cluster&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;submitTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;WordCount&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;conf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;else&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+       &lt;span class=&quot;c1&quot;&gt;// use FlinkSubmitter instead of 
StormSubmitter&lt;/span&gt;
+       &lt;span class=&quot;n&quot;&gt;FlinkSubmitter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;submitTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;WordCount&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;conf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;As a shorter Flink-style alternative that replaces the Storm-style 
submission code, you can also use context-based job execution:&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// 
transform Storm topology to Flink program (as above)&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;FlinkTopology&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topology&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FlinkTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;createTopology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;builder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// executes locally by default or remotely if 
submitted with Flink&amp;#39;s command-line client&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;topology&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;execute&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;After the code is packaged in a jar file (e.g., 
&lt;code&gt;StormWordCount.jar&lt;/code&gt;), it can be easily submitted to 
Flink via&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code&gt;bin/flink run 
StormWordCount.jar
+&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;The used Spouts and Bolts as well as the topology assemble code is 
not changed at all!
+Only the translation and submission step have to be changed to the Storm-API 
compatible Flink pendants.
+This allows for minimal code changes and easy adaption to Flink.&lt;/p&gt;
+
+&lt;h3 
id=&quot;embedding-spouts-and-bolts-in-flink-programs&quot;&gt;Embedding Spouts 
and Bolts in Flink programs&lt;/h3&gt;
+
+&lt;p&gt;It is also possible to use Spouts and Bolts within a regular Flink 
DataStream program.
+The compatibility package provides wrapper classes for Spouts and Bolts which 
are implemented as a Flink &lt;code&gt;SourceFunction&lt;/code&gt; and 
&lt;code&gt;StreamOperator&lt;/code&gt; respectively.
+Those wrappers automatically translate incoming Flink POJO and 
&lt;code&gt;TupleXX&lt;/code&gt; records into Storm’s 
&lt;code&gt;Tuple&lt;/code&gt; type and emitted &lt;code&gt;Values&lt;/code&gt; 
back into either POJOs or &lt;code&gt;TupleXX&lt;/code&gt; types for further 
processing by Flink operators.
+As Storm is type agnostic, it is required to specify the output type of 
embedded Spouts/Bolts manually to get a fully typed Flink streaming 
program.&lt;/p&gt;
+
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;// use 
regular Flink streaming environment&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;StreamExecutionEnvironment&lt;/span&gt; 
&lt;span class=&quot;n&quot;&gt;env&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StreamExecutionEnvironment&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getExecutionEnvironment&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// use Spout as source&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;source&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; 
+  &lt;span class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;c1&quot;&gt;// Flink 
provided wrapper including original Spout&lt;/span&gt;
+                &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SpoutWrapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FileSpout&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;localFilePath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)),&lt;/span&gt; 
+                &lt;span class=&quot;c1&quot;&gt;// specify output type 
manually&lt;/span&gt;
+                &lt;span 
class=&quot;n&quot;&gt;TypeExtractor&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getForObject&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)));&lt;/span&gt;
+&lt;span class=&quot;c1&quot;&gt;// FileSpout cannot be 
parallelized&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;source&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setParallelism&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// further processing with Flink&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tokenizer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;keyBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span class=&quot;mi&quot;&gt;0&lt;/s
 pan&gt;&lt;span class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// use Bolt for counting&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;counts&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt;
+  &lt;span class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;transform&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;Counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;
+                   &lt;span class=&quot;c1&quot;&gt;// specify output type 
manually&lt;/span&gt;
+                   &lt;span 
class=&quot;n&quot;&gt;TypeExtractor&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getForObject&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+                   &lt;span class=&quot;c1&quot;&gt;// Flink provided wrapper 
including original Bolt&lt;/span&gt;
+                   &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BoltWrapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BoltCounter&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()));&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// write result to file via Flink 
sink&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;counts&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;writeAsText&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;outputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+
+&lt;span class=&quot;c1&quot;&gt;// start Flink job&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;execute&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;WordCount with Spout source and Bolt 
counter&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+
+&lt;p&gt;Although some boilerplate code is needed (we plan to address this 
soon!), the actual embedded Spout and Bolt code can be used unmodified.
+We also note that the resulting program is fully typed, and type errors will 
be found by Flink’s type extractor even if the original Spouts and Bolts are 
not.&lt;/p&gt;
+
+&lt;h2 id=&quot;outlook&quot;&gt;Outlook&lt;/h2&gt;
+
+&lt;p&gt;The Storm compatibility package is currently in beta and undergoes 
continuous development.
+We are currently working on providing consistency guarantees for stateful 
Bolts.
+Furthermore, we want to provide a better API integration for embedded Spouts 
and Bolts by providing a “StormExecutionEnvironment” as a special extension 
of Flink’s &lt;code&gt;StreamExecutionEnvironment&lt;/code&gt;.
+We are also investigating the integration of Storm’s higher-level 
programming API Trident.&lt;/p&gt;
+
+&lt;h2 id=&quot;summary&quot;&gt;Summary&lt;/h2&gt;
+
+&lt;p&gt;Flink’s compatibility package for Storm allows using unmodified 
Spouts and Bolts within Flink.
+This enables you to even embed third-party Spouts and Bolts where the source 
code is not available.
+While you can embed Spouts/Bolts in a Flink program and mix-and-match them 
with Flink operators, running whole topologies is the easiest way to get 
started and can be achieved with almost no code changes.&lt;/p&gt;
+
+&lt;p&gt;If you want to try out Flink’s Storm compatibility package checkout 
our &lt;a 
href=&quot;https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compatibility.html&quot;&gt;Documentation&lt;/a&gt;.&lt;/p&gt;
+
+&lt;hr /&gt;
+
+&lt;p&gt;&lt;sup id=&quot;fn1&quot;&gt;1. We confess, there are three lines 
changed compared to a Storm project &lt;img class=&quot;emoji&quot; 
style=&quot;width:16px;height:16px;align:absmiddle&quot; 
src=&quot;/img/blog/smirk.png&quot; /&gt;—because the example covers local 
&lt;em&gt;and&lt;/em&gt; remote execution. &lt;a href=&quot;#ref1&quot; 
title=&quot;Back to text.&quot;&gt;↩&lt;/a&gt;&lt;/sup&gt;&lt;/p&gt;
+
+</description>
+<pubDate>Fri, 11 Dec 2015 11:00:00 +0100</pubDate>
+<link>http://flink.apache.org/news/2015/12/11/storm-compatibility.html</link>
+<guid isPermaLink="true">/news/2015/12/11/storm-compatibility.html</guid>
+</item>
+
+<item>
 <title>Introducing Stream Windows in Apache Flink</title>
 <description>&lt;p&gt;The data analysis space is witnessing an evolution from 
batch to stream processing for many use cases. Although batch can be handled as 
a special case of stream processing, analyzing never-ending streaming data 
often requires a shift in the mindset and comes with its own terminology (for 
example, “windowing” and “at-least-once”/”exactly-once” 
processing). This shift and the new terminology can be quite confusing for 
people being new to the space of stream processing. Apache Flink is a 
production-ready stream processor with an easy-to-use yet very expressive API 
to define advanced stream analysis programs. Flink’s API features very 
flexible window definitions on data streams which let it stand out among other 
open source stream processors. &lt;/p&gt;
 
@@ -1477,7 +1624,7 @@ of vertices, edges and the node degrees. &lt;/p&gt;
 the ones provided by the batch processing API. These transformations can be 
applied one after the
 other, yielding a new Graph after each step, in a fashion similar to operators 
on DataSets: &lt;/p&gt;
 
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;inputGraph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getUndirected&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;CustomEdgeMapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;inputGraph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getUndirected&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;CustomEdgeMapper&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;Transformations can be applied on:&lt;/p&gt;
 
@@ -1513,7 +1660,7 @@ one or more values per vertex, the more general  
&lt;code&gt;groupReduceOnEdges(
 &lt;p&gt;Assume you would want to compute the sum of the values of all 
incoming neighbors for each vertex.
 We will call the &lt;code&gt;reduceOnNeighbors()&lt;/code&gt; aggregation 
method since the sum is an associative and commutative operation and the 
neighbors’ values are needed:&lt;/p&gt;
 
-&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;graph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceOnNeighbors&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;SumValues&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;IN&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span 
class=&quot;n&quot;&gt;graph&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceOnNeighbors&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SumValues&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;IN&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;The vertex with id 1 is the only node that has no incoming edges. The 
result is therefore:&lt;/p&gt;
 
@@ -1699,7 +1846,7 @@ playlist, we use a coGroup function to filter out the 
mismatches.&lt;/p&gt;
 &lt;span class=&quot;c1&quot;&gt;// read the mismatches dataset and extract 
the songIDs&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple3&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Integer&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;validTriplets&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplets&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;coGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;mismatches&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;where&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;equalTo&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;with&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;CoGroupFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;with&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;CoGroupFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;coGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplets&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;invalidSongs&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                         &lt;span class=&quot;k&quot;&gt;if&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(!&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;invalidSongs&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;iterator&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;().&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;hasNext&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;())&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                             &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple3&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplet&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;triplets&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt; &lt;span class=&quot;c1&quot;&gt;// valid 
triplet&lt;/span&gt;
@@ -1737,7 +1884,7 @@ basically iterate through the edge value and collect the 
target (song) of the ma
 
 &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot;&gt;&lt;span class=&quot;c1&quot;&gt;//get the 
top track (most listened to) for each user&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;usersWithTopTrack&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;userSongGraph&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupReduceOnEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;GetTopSongPerUser&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;OUT&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupReduceOnEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;GetTopSongPerUser&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgeDirection&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;OUT&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
 &lt;span class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;GetTopSongPerUser&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;implements&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;EdgesFunctionWithVertexValue&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
     &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;iterateEdges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Vertex&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
@@ -1750,7 +1897,7 @@ basically iterate through the edge value and collect the 
target (song) of the ma
                 &lt;span class=&quot;n&quot;&gt;topSong&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getTarget&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
-        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topSong&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getId&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;topSong&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
 &lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
@@ -1786,14 +1933,14 @@ straightforward as a call to the 
&lt;code&gt;Graph.fromDataSet()&lt;/code&gt; me
                 &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;})&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;GroupReduceFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;reduceGroup&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;GroupReduceFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;reduce&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Iterable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-                    &lt;span class=&quot;n&quot;&gt;List&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;ArrayList&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+                    &lt;span class=&quot;n&quot;&gt;List&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;ArrayList&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
                     &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edge&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;:&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;edges&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                         &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;add&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
                         &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;int&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;size&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;++)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                             &lt;span class=&quot;k&quot;&gt;for&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kt&quot;&gt;int&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;+&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;size&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;++)&lt;/span&gt; &lt;span clas
 s=&quot;o&quot;&gt;{&lt;/span&gt;
-                                &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)));&lt;/span&gt;
+                                &lt;span 
class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Edge&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;i&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;users&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;get&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;j&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)));&lt;/span&gt;
                             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
                         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
                     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
@@ -1824,12 +1971,12 @@ among their neighbors. &lt;/p&gt;
 
 &lt;span class=&quot;c1&quot;&gt;// update the vertex values and run the label 
propagation algorithm&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataSet&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Vertex&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;verticesWithCommunity&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;similarUsersGraph&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;joinWithVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;idsWithlLabels&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;joinWithVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;idsWithlLabels&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                 &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Long&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Tuple2&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;idWithLabel&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                     &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; 
&lt;span class=&quot;n&quot;&gt;idWithLabel&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;f1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
                 &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;})&lt;/span&gt;
-        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;run&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;LabelPropagation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;numIterations&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
+        &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;run&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LabelPropagation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;numIterations&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getVertices&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
 
 &lt;p&gt;&lt;a href=&quot;#top&quot;&gt;Back to top&lt;/a&gt;&lt;/p&gt;
@@ -3039,16 +3186,16 @@ found &lt;a 
href=&quot;https://github.com/mbalassi/flink/blob/stockprices/flink-
                 &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
                 &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
                     &lt;span class=&quot;n&quot;&gt;tokens&lt;/span&gt; 
&lt;span class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;split&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;,&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
-                    &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; 
&lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;],&lt;/span&gt;
+                    &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; 
&lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;],&lt;/span&gt;
                         &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;parseDouble&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;tokens&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;[&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;]));&lt;/span&gt;
                 &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
             &lt;span class=&quot;o&quot;&gt;});&lt;/span&gt;
 
     &lt;span class=&quot;c1&quot;&gt;//Generate other stock 
streams&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SPX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FTSE_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;20&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DJI_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
-    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BUX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;40&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SPX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;SPX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;10&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;FTSE_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;FTSE&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;20&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DJI_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;DJI&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+    &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;BUX_stream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;BUX&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;40&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
 
     &lt;span class=&quot;c1&quot;&gt;//Merge all stock streams 
together&lt;/span&gt;
     &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;stockStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;socketStockStream&lt;/span&gt;
@@ -3129,11 +3276,11 @@ of this example, the data streams are simply generated 
using the
     &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
     &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;invoke&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
         &lt;span class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
-        &lt;span class=&quot;n&quot;&gt;Random&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;Random&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
         &lt;span class=&quot;k&quot;&gt;while&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kc&quot;&gt;true&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;+&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;nextGaussian&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;*&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sigma&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
-            &lt;span class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+            &lt;span class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;Thread&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;sleep&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;nextInt&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;200&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
@@ -3220,7 +3367,7 @@ performed on named fields of POJOs, making the code more 
readable.&lt;/p&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;maxByStock&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;windowedStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;maxBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;price&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;rollingMean&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;windowedStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
-    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;WindowMean&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;WindowMean&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Compute the mean of a window&lt;/span&gt;
 &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span 
class=&quot;nc&quot;&gt;WindowMean&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;implements&lt;/span&gt; 
@@ -3240,7 +3387,7 @@ performed on named fields of POJOs, making the code more 
readable.&lt;/p&gt;
                 &lt;span class=&quot;n&quot;&gt;symbol&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sp&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;;&lt;/span&gt;
                 &lt;span class=&quot;n&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;++;&lt;/span&gt;
             &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
-            &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sum&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;/&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+            &lt;span class=&quot;n&quot;&gt;out&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;collect&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;symbol&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;sum&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;/&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
 &lt;span 
class=&quot;o&quot;&gt;}&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
@@ -3300,7 +3447,7 @@ every 30 seconds.&lt;/p&gt;
   &lt;div data-lang=&quot;java7&quot;&gt;
 
     &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1000&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.;&lt;/span&gt;
-&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_STOCK_PRICE&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+&lt;span class=&quot;kd&quot;&gt;private&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;static&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;final&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_STOCK_PRICE&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StockPrice&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Use delta policy to create price change 
warnings&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;priceWarnings&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;stockStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt;
@@ -3310,13 +3457,13 @@ every 30 seconds.&lt;/p&gt;
             &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Math&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;abs&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;oldDataPoint&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;price&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;-&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;newDataPoint&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;price&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
         &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;},&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;DEFAULT_STOCK_PRICE&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
-&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;SendWarning&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;SendWarning&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;()).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Count the number of warnings every half a 
minute&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;warningsPerStock&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;priceWarnings&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
     &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
     &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
 &lt;span class=&quot;o&quot;&gt;}).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;window&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Time&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;of&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TimeUnit&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;SECONDS&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;sum&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;count&amp;quo
 t;&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
@@ -3398,7 +3545,7 @@ but for the sake of this example we generate dummy tweet 
data.&lt;/p&gt;
   &lt;div data-lang=&quot;java7&quot;&gt;
 
     &lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code 
class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span 
class=&quot;c1&quot;&gt;//Read a stream of tweets&lt;/span&gt;
-&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TweetSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+&lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetStream&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;env&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TweetSource&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
 
 &lt;span class=&quot;c1&quot;&gt;//Extract the stock symbols&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;mentionedSymbols&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatMap&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;
@@ -3421,7 +3568,7 @@ but for the sake of this example we generate dummy tweet 
data.&lt;/p&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetsPerStock&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;mentionedSymbols&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;MapFunction&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;()&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
     &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
     &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;map&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;return&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Count&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;value&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;mi&quot;&gt;1&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
 &lt;span class=&quot;o&quot;&gt;}).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;groupBy&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;symbol&amp;quot;&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;window&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Time&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;of&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TimeUnit&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;SECONDS&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;sum&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;s&quot;&gt;&amp;quot;count&amp;quo
 t;&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;).&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;flatten&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
@@ -3431,8 +3578,8 @@ but for the sake of this example we generate dummy tweet 
data.&lt;/p&gt;
 
     &lt;span class=&quot;nd&quot;&gt;@Override&lt;/span&gt;
     &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span 
class=&quot;kt&quot;&gt;void&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;invoke&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;String&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;collector&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;kd&quot;&gt;throws&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Exception&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
-        &lt;span class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
-        &lt;span class=&quot;n&quot;&gt;stringBuilder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;StringBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;random&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Random&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
+        &lt;span class=&quot;n&quot;&gt;stringBuilder&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;StringBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
         &lt;span class=&quot;k&quot;&gt;while&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;kc&quot;&gt;true&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;)&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;{&lt;/span&gt;
             &lt;span class=&quot;n&quot;&gt;stringBuilder&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;setLength&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;0&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;);&lt;/span&gt;
@@ -3514,7 +3661,7 @@ these data streams are potentially infinite, we apply the 
join on a
 &lt;span class=&quot;c1&quot;&gt;//Compute rolling correlation&lt;/span&gt;
 &lt;span class=&quot;n&quot;&gt;DataStream&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Double&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;rollingCorrelation&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;tweetsAndWarning&lt;/span&gt;
     &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;window&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;Time&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;of&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;mi&quot;&gt;30&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TimeUnit&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;SECONDS&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;))&lt;/span&gt;
-    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;WindowCorrelation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+    &lt;span class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;mapWindow&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;WindowCorrelation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
 
 &lt;span class=&quot;n&quot;&gt;rollingCorrelation&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;print&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;();&lt;/span&gt;
 
@@ -3836,25 +3983,25 @@ Flink serialization system improved a lot over time and 
by now surpasses the cap
   &lt;span class=&quot;c1&quot;&gt;// Setup Hadoop’s 
TextInputFormat&lt;/span&gt;
   &lt;span class=&quot;n&quot;&gt;HadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt; &lt;span 
class=&quot;o&quot;&gt;=&lt;/span&gt; 
       &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;HadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;lt;&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;&amp;gt;(&lt;/span&gt;
-        &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;JobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
-  &lt;span class=&quot;n&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addInputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;nf&quot;&gt;Path&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
+        &lt;span class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;LongWritable&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Text&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;class&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;,&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;JobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;());&lt;/span&gt;
+  &lt;span class=&quot;n&quot;&gt;TextInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;addInputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;hadoopInputFormat&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;.&lt;/span&gt;&lt;span 
class=&quot;na&quot;&gt;getJobConf&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(),&lt;/span&gt; &lt;span 
class=&quot;k&quot;&gt;new&lt;/span&gt; &lt;span 
class=&quot;n&quot;&gt;Path&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;(&lt;/span&gt;&lt;span 
class=&quot;n&quot;&gt;inputPath&lt;/span&gt;&lt;span 
class=&quot;o&quot;&gt;));&lt;/span&gt;
   
   &lt;span class=&quot;c1&quot;&gt;// Read a DataSet with the Hadoop In

<TRUNCATED>

Reply via email to