http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e63786d/content/blog/feed.xml ---------------------------------------------------------------------- diff --git a/content/blog/feed.xml b/content/blog/feed.xml index 3b2b4a1..17afaf1 100644 --- a/content/blog/feed.xml +++ b/content/blog/feed.xml @@ -7,6 +7,153 @@ <atom:link href="http://flink.apache.org/blog/feed.xml" rel="self" type="application/rss+xml" /> <item> +<title>Storm Compatibility in Apache Flink: How to run existing Storm topologies on Flink</title> +<description><p><a href="https://storm.apache.org">Apache Storm</a> was one of the first distributed and scalable stream processing systems available in the open source space offering (near) real-time tuple-by-tuple processing semantics. +Initially released by the developers at Backtype in 2011 under the Eclipse open-source license, it became popular very quickly. +Only shortly afterwards, Twitter acquired Backtype. +Since then, Storm has been growing in popularity, is used in production at many big companies, and is the de-facto industry standard for big data stream processing. +In 2013, Storm entered the Apache incubator program, followed by its graduation to top-level in 2014.</p> + +<p>Apache Flink is a stream processing engine that improves upon older technologies like Storm in several dimensions, +including <a href="https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html">strong consistency guarantees</a> (âexactly onceâ), +a higher level <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html">DataStream API</a>, +support for <a href="http://flink.apache.org/news/2015/12/04/Introducing-windows.html">event time and a rich windowing system</a>, +as well as <a href="https://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/">superior throughput with competitive low latency</a>.</p> + +<p>While Flink offers several technical benefits over Storm, an existing investment on a codebase of applications developed for Storm often makes it difficult to switch engines. +For these reasons, as part of the Flink 0.10 release, Flink ships with a Storm compatibility package that allows users to:</p> + +<ul> + <li>Run <strong>unmodified</strong> Storm topologies using Apache Flink benefiting from superior performance.</li> + <li><strong>Embed</strong> Storm code (spouts and bolts) as operators inside Flink DataStream programs.</li> +</ul> + +<p>Only minor code changes are required in order to submit the program to Flink instead of Storm. +This minimizes the work for developers to run existing Storm topologies while leveraging Apache Flinkâs fast and robust execution engine.</p> + +<p>We note that the Storm compatibility package is continuously improving and does not cover the full spectrum of Stormâs API. +However, it is powerful enough to cover many use cases.</p> + +<h2 id="executing-storm-topologies-with-flink">Executing Storm topologies with Flink</h2> + +<center> +<img src="/img/blog/flink-storm.png" style="height:200px;margin:15px" /> +</center> + +<p>The easiest way to use the Storm compatibility package is by executing a whole Storm topology in Flink. +For this, you only need to replace the dependency <code>storm-core</code> by <code>flink-storm</code> in your Storm project and <strong>change two lines of code</strong> in your original Storm program.</p> + +<p>The following example shows a simple Storm-Word-Count-Program that can be executed in Flink. +First, the program is assembled the Storm way without any code change to Spouts, Bolts, or the topology itself.</p> + +<div class="highlight"><pre><code class="language-java"><span class="c1">// assemble topology, the Storm way</span> +<span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span> +<span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">&quot;source&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">StormFileSpout</span><span class="o">(</span><span class="n">inputFilePath</span><span class="o">));</span> +<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">&quot;tokenizer&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">StormBoltTokenizer</span><span class="o">())</span> + <span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">&quot;source&quot;</span><span class="o">);</span> +<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">&quot;counter&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">StormBoltCounter</span><span class="o">())</span> + <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">&quot;tokenizer&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">));</span> +<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">&quot;sink&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">StormBoltFileSink</span><span class="o">(</span><span class="n">outputFilePath</span><span class="o">))</span> + <span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">&quot;counter&quot;</span><span class="o">);</span></code></pre></div> + +<p>In order to execute the topology, we need to translate it to a <code>FlinkTopology</code> and submit it to a local or remote Flink cluster, very similar to submitting the application to a Storm cluster.<sup><a href="#fn1" id="ref1">1</a></sup></p> + +<div class="highlight"><pre><code class="language-java"><span class="c1">// transform Storm topology to Flink program</span> +<span class="c1">// replaces: StormTopology topology = builder.createTopology();</span> +<span class="n">FlinkTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="n">FlinkTopology</span><span class="o">.</span><span class="na">createTopology</span><span class="o">(</span><span class="n">builder</span><span class="o">);</span> + +<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span> +<span class="k">if</span><span class="o">(</span><span class="n">runLocal</span><span class="o">)</span> <span class="o">{</span> + <span class="c1">// use FlinkLocalCluster instead of LocalCluster</span> + <span class="n">FlinkLocalCluster</span> <span class="n">cluster</span> <span class="o">=</span> <span class="n">FlinkLocalCluster</span><span class="o">.</span><span class="na">getLocalCluster</span><span class="o">();</span> + <span class="n">cluster</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">&quot;WordCount&quot;</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">);</span> +<span class="o">}</span> <span class="k">else</span> <span class="o">{</span> + <span class="c1">// use FlinkSubmitter instead of StormSubmitter</span> + <span class="n">FlinkSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">&quot;WordCount&quot;</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">);</span> +<span class="o">}</span></code></pre></div> + +<p>As a shorter Flink-style alternative that replaces the Storm-style submission code, you can also use context-based job execution:</p> + +<div class="highlight"><pre><code class="language-java"><span class="c1">// transform Storm topology to Flink program (as above)</span> +<span class="n">FlinkTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="n">FlinkTopology</span><span class="o">.</span><span class="na">createTopology</span><span class="o">(</span><span class="n">builder</span><span class="o">);</span> + +<span class="c1">// executes locally by default or remotely if submitted with Flink&#39;s command-line client</span> +<span class="n">topology</span><span class="o">.</span><span class="na">execute</span><span class="o">()</span></code></pre></div> + +<p>After the code is packaged in a jar file (e.g., <code>StormWordCount.jar</code>), it can be easily submitted to Flink via</p> + +<div class="highlight"><pre><code>bin/flink run StormWordCount.jar +</code></pre></div> + +<p>The used Spouts and Bolts as well as the topology assemble code is not changed at all! +Only the translation and submission step have to be changed to the Storm-API compatible Flink pendants. +This allows for minimal code changes and easy adaption to Flink.</p> + +<h3 id="embedding-spouts-and-bolts-in-flink-programs">Embedding Spouts and Bolts in Flink programs</h3> + +<p>It is also possible to use Spouts and Bolts within a regular Flink DataStream program. +The compatibility package provides wrapper classes for Spouts and Bolts which are implemented as a Flink <code>SourceFunction</code> and <code>StreamOperator</code> respectively. +Those wrappers automatically translate incoming Flink POJO and <code>TupleXX</code> records into Stormâs <code>Tuple</code> type and emitted <code>Values</code> back into either POJOs or <code>TupleXX</code> types for further processing by Flink operators. +As Storm is type agnostic, it is required to specify the output type of embedded Spouts/Bolts manually to get a fully typed Flink streaming program.</p> + +<div class="highlight"><pre><code class="language-java"><span class="c1">// use regular Flink streaming environment</span> +<span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span> + +<span class="c1">// use Spout as source</span> +<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">source</span> <span class="o">=</span> + <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="c1">// Flink provided wrapper including original Spout</span> + <span class="k">new</span> <span class="n">SpoutWrapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="k">new</span> <span class="n">FileSpout</span><span class="o">(</span><span class="n">localFilePath</span><span class="o">)),</span> + <span class="c1">// specify output type manually</span> + <span class="n">TypeExtractor</span><span class="o">.</span><span class="na">getForObject</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;&quot;</span><span class="o">)));</span> +<span class="c1">// FileSpout cannot be parallelized</span> +<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">source</span><span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> + +<span class="c1">// further processing with Flink</span> +<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">tokens</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">Tokenizer</span><span class="o">()).</span><span class="na">keyBy</span><span class="o">(</span><span class="mi">0</s pan><span class="o">);</span> + +<span class="c1">// use Bolt for counting</span> +<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">counts</span> <span class="o">=</span> + <span class="n">tokens</span><span class="o">.</span><span class="na">transform</span><span class="o">(</span><span class="s">&quot;Counter&quot;</span><span class="o">,</span> + <span class="c1">// specify output type manually</span> + <span class="n">TypeExtractor</span><span class="o">.</span><span class="na">getForObject</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;&quot;</span><span class="o">,</span><span class="mi">0</span><span class="o">))</span> + <span class="c1">// Flink provided wrapper including original Bolt</span> + <span class="k">new</span> <span class="n">BoltWrapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;(</span><span class="k">new</span> <span class="n">BoltCounter</span><span class="o">()));</span> + +<span class="c1">// write result to file via Flink sink</span> +<span class="n">counts</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(</span><span class="n">outputPath</span><span class="o">);</span> + +<span class="c1">// start Flink job</span> +<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;WordCount with Spout source and Bolt counter&quot;</span><span class="o">);</span></code></pre></div> + +<p>Although some boilerplate code is needed (we plan to address this soon!), the actual embedded Spout and Bolt code can be used unmodified. +We also note that the resulting program is fully typed, and type errors will be found by Flinkâs type extractor even if the original Spouts and Bolts are not.</p> + +<h2 id="outlook">Outlook</h2> + +<p>The Storm compatibility package is currently in beta and undergoes continuous development. +We are currently working on providing consistency guarantees for stateful Bolts. +Furthermore, we want to provide a better API integration for embedded Spouts and Bolts by providing a âStormExecutionEnvironmentâ as a special extension of Flinkâs <code>StreamExecutionEnvironment</code>. +We are also investigating the integration of Stormâs higher-level programming API Trident.</p> + +<h2 id="summary">Summary</h2> + +<p>Flinkâs compatibility package for Storm allows using unmodified Spouts and Bolts within Flink. +This enables you to even embed third-party Spouts and Bolts where the source code is not available. +While you can embed Spouts/Bolts in a Flink program and mix-and-match them with Flink operators, running whole topologies is the easiest way to get started and can be achieved with almost no code changes.</p> + +<p>If you want to try out Flinkâs Storm compatibility package checkout our <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compatibility.html">Documentation</a>.</p> + +<hr /> + +<p><sup id="fn1">1. We confess, there are three lines changed compared to a Storm project <img class="emoji" style="width:16px;height:16px;align:absmiddle" src="/img/blog/smirk.png" />âbecause the example covers local <em>and</em> remote execution. <a href="#ref1" title="Back to text.">â©</a></sup></p> + +</description> +<pubDate>Fri, 11 Dec 2015 11:00:00 +0100</pubDate> +<link>http://flink.apache.org/news/2015/12/11/storm-compatibility.html</link> +<guid isPermaLink="true">/news/2015/12/11/storm-compatibility.html</guid> +</item> + +<item> <title>Introducing Stream Windows in Apache Flink</title> <description><p>The data analysis space is witnessing an evolution from batch to stream processing for many use cases. Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, âwindowingâ and âat-least-onceâ/âexactly-onceâ processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing. Apache Flink is a production-ready stream processor with an easy-to-use yet very expressive API to define advanced stream analysis programs. Flinkâs API features very flexible window definitions on data streams which let it stand out among other open source stream processors. </p> @@ -1477,7 +1624,7 @@ of vertices, edges and the node degrees. </p> the ones provided by the batch processing API. These transformations can be applied one after the other, yielding a new Graph after each step, in a fashion similar to operators on DataSets: </p> -<div class="highlight"><pre><code class="language-java"><span class="n">inputGraph</span><span class="o">.</span><span class="na">getUndirected</span><span class="o">().</span><span class="na">mapEdges</span><span class="o">(</span><span class="k">new</span> <span class="nf">CustomEdgeMapper</span><span class="o">());</span></code></pre></div> +<div class="highlight"><pre><code class="language-java"><span class="n">inputGraph</span><span class="o">.</span><span class="na">getUndirected</span><span class="o">().</span><span class="na">mapEdges</span><span class="o">(</span><span class="k">new</span> <span class="n">CustomEdgeMapper</span><span class="o">());</span></code></pre></div> <p>Transformations can be applied on:</p> @@ -1513,7 +1660,7 @@ one or more values per vertex, the more general <code>groupReduceOnEdges( <p>Assume you would want to compute the sum of the values of all incoming neighbors for each vertex. We will call the <code>reduceOnNeighbors()</code> aggregation method since the sum is an associative and commutative operation and the neighborsâ values are needed:</p> -<div class="highlight"><pre><code class="language-java"><span class="n">graph</span><span class="o">.</span><span class="na">reduceOnNeighbors</span><span class="o">(</span><span class="k">new</span> <span class="nf">SumValues</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">IN</span><span class="o">);</span></code></pre></div> +<div class="highlight"><pre><code class="language-java"><span class="n">graph</span><span class="o">.</span><span class="na">reduceOnNeighbors</span><span class="o">(</span><span class="k">new</span> <span class="n">SumValues</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">IN</span><span class="o">);</span></code></pre></div> <p>The vertex with id 1 is the only node that has no incoming edges. The result is therefore:</p> @@ -1699,7 +1846,7 @@ playlist, we use a coGroup function to filter out the mismatches.</p> <span class="c1">// read the mismatches dataset and extract the songIDs</span> <span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">validTriplets</span> <span class="o">=</span> <span class="n">triplets</span> <span class="o">.</span><span class="na">coGroup</span><span class="o">(</span><span class="n">mismatches</span><span class="o">).</span><span class="na">where</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> - <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="nf">CoGroupFunction</span><span class="o">()</span> <span class="o">{</span> + <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="n">CoGroupFunction</span><span class="o">()</span> <span class="o">{</span> <span class="kt">void</span> <span class="nf">coGroup</span><span class="o">(</span><span class="n">Iterable</span> <span class="n">triplets</span><span class="o">,</span> <span class="n">Iterable</span> <span class="n">invalidSongs</span><span class="o">,</span> <span class="n">Collector</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> <span class="k">if</span> <span class="o">(!</span><span class="n">invalidSongs</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span> <span class="k">for</span> <span class="o">(</span><span class="n">Tuple3</span> <span class="n">triplet</span> <span class="o">:</span> <span class="n">triplets</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// valid triplet</span> @@ -1737,7 +1884,7 @@ basically iterate through the edge value and collect the target (song) of the ma <div class="highlight"><pre><code class="language-java"><span class="c1">//get the top track (most listened to) for each user</span> <span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&gt;</span> <span class="n">usersWithTopTrack</span> <span class="o">=</span> <span class="n">userSongGraph</span> - <span class="o">.</span><span class="na">groupReduceOnEdges</span><span class="o">(</span><span class="k">new</span> <span class="nf">GetTopSongPerUser</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">OUT</span><span class="o">);</span> + <span class="o">.</span><span class="na">groupReduceOnEdges</span><span class="o">(</span><span class="k">new</span> <span class="n">GetTopSongPerUser</span><span class="o">(),</span> <span class="n">EdgeDirection</span><span class="o">.</span><span class="na">OUT</span><span class="o">);</span> <span class="kd">class</span> <span class="nc">GetTopSongPerUser</span> <span class="kd">implements</span> <span class="n">EdgesFunctionWithVertexValue</span> <span class="o">{</span> <span class="kt">void</span> <span class="nf">iterateEdges</span><span class="o">(</span><span class="n">Vertex</span> <span class="n">vertex</span><span class="o">,</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span> <span class="n">edges</span><span class="o">)</span> <span class="o">{</span> @@ -1750,7 +1897,7 @@ basically iterate through the edge value and collect the target (song) of the ma <span class="n">topSong</span> <span class="o">=</span> <span class="n">edge</span><span class="o">.</span><span class="na">getTarget</span><span class="o">();</span> <span class="o">}</span> <span class="o">}</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">vertex</span><span class="o">.</span><span class="na">getId</span><span class="o">(),</span> <span class="n">topSong</span><span class="o">);</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">(</span><span class="n">vertex</span><span class="o">.</span><span class="na">getId</span><span class="o">(),</span> <span class="n">topSong</span><span class="o">);</span> <span class="o">}</span> <span class="o">}</span></code></pre></div> @@ -1786,14 +1933,14 @@ straightforward as a call to the <code>Graph.fromDataSet()</code> me <span class="o">}</span> <span class="o">})</span> <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> - <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">GroupReduceFunction</span><span class="o">()</span> <span class="o">{</span> + <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">GroupReduceFunction</span><span class="o">()</span> <span class="o">{</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span> <span class="n">edges</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Edge</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span> - <span class="n">List</span> <span class="n">users</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ArrayList</span><span class="o">();</span> + <span class="n">List</span> <span class="n">users</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">();</span> <span class="k">for</span> <span class="o">(</span><span class="n">Edge</span> <span class="n">edge</span> <span class="o">:</span> <span class="n">edges</span><span class="o">)</span> <span class="o">{</span> <span class="n">users</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="na">getSource</span><span class="o">());</span> <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="n">users</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">-</span> <span class="mi">1</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span> <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">j</span> <span class="o">=</span> <span class="n">i</span><span class="o">+</span><span class="mi">1</span><span class="o">;</span> <span class="n">j</span> <span class="o">&lt;</span> <span class="n">users</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">-</span> <span class="mi">1</span><span class="o">;</span> <span class="n">j</span><span class="o">++)</span> <span clas s="o">{</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Edge</span><span class="o">(</span><span class="n">users</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">i</span><span class="o">),</span> <span class="n">users</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">j</span><span class="o">)));</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Edge</span><span class="o">(</span><span class="n">users</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">i</span><span class="o">),</span> <span class="n">users</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">j</span><span class="o">)));</span> <span class="o">}</span> <span class="o">}</span> <span class="o">}</span> @@ -1824,12 +1971,12 @@ among their neighbors. </p> <span class="c1">// update the vertex values and run the label propagation algorithm</span> <span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Vertex</span><span class="o">&gt;</span> <span class="n">verticesWithCommunity</span> <span class="o">=</span> <span class="n">similarUsersGraph</span> - <span class="o">.</span><span class="na">joinWithVertices</span><span class="o">(</span><span class="n">idsWithlLabels</span><span class="o">,</span> <span class="k">new</span> <span class="nf">MapFunction</span><span class="o">()</span> <span class="o">{</span> + <span class="o">.</span><span class="na">joinWithVertices</span><span class="o">(</span><span class="n">idsWithlLabels</span><span class="o">,</span> <span class="k">new</span> <span class="n">MapFunction</span><span class="o">()</span> <span class="o">{</span> <span class="kd">public</span> <span class="n">Long</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span> <span class="n">idWithLabel</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">idWithLabel</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span> <span class="o">}</span> <span class="o">})</span> - <span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="k">new</span> <span class="nf">LabelPropagation</span><span class="o">(</span><span class="n">numIterations</span><span class="o">))</span> + <span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="k">new</span> <span class="n">LabelPropagation</span><span class="o">(</span><span class="n">numIterations</span><span class="o">))</span> <span class="o">.</span><span class="na">getVertices</span><span class="o">();</span></code></pre></div> <p><a href="#top">Back to top</a></p> @@ -3039,16 +3186,16 @@ found <a href="https://github.com/mbalassi/flink/blob/stockprices/flink- <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">StockPrice</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> <span class="n">tokens</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">);</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">StockPrice</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">Double</span><span class="o">.</span><span class="na">parseDouble</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span> <span class="o">}</span> <span class="o">});</span> <span class="c1">//Generate other stock streams</span> - <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">SPX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="mi">10</span><span class="o">));</span> - <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">FTSE_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="mi">20</span><span class="o">));</span> - <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">DJI_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="mi">30</span><span class="o">));</span> - <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">BUX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="mi">40</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">SPX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">StockSource</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="mi">10</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">FTSE_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">StockSource</span><span class="o">(</span><span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="mi">20</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">DJI_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">StockSource</span><span class="o">(</span><span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="mi">30</span><span class="o">));</span> + <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">BUX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">StockSource</span><span class="o">(</span><span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="mi">40</span><span class="o">));</span> <span class="c1">//Merge all stock streams together</span> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">stockStream</span> <span class="o">=</span> <span class="n">socketStockStream</span> @@ -3129,11 +3276,11 @@ of this example, the data streams are simply generated using the <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> <span class="n">price</span> <span class="o">=</span> <span class="n">DEFAULT_PRICE</span><span class="o">;</span> - <span class="n">Random</span> <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span> + <span class="n">Random</span> <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Random</span><span class="o">();</span> <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> <span class="n">price</span> <span class="o">=</span> <span class="n">price</span> <span class="o">+</span> <span class="n">random</span><span class="o">.</span><span class="na">nextGaussian</span><span class="o">()</span> <span class="o">*</span> <span class="n">sigma</span><span class="o">;</span> - <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">));</span> + <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">));</span> <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> @@ -3220,7 +3367,7 @@ performed on named fields of POJOs, making the code more readable.</p> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">maxByStock</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span> <span class="o">.</span><span class="na">maxBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">rollingMean</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span> - <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowMean</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> + <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="n">WindowMean</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> <span class="c1">//Compute the mean of a window</span> <span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WindowMean</span> <span class="kd">implements</span> @@ -3240,7 +3387,7 @@ performed on named fields of POJOs, making the code more readable.</p> <span class="n">symbol</span> <span class="o">=</span> <span class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span class="o">;</span> <span class="n">count</span><span class="o">++;</span> <span class="o">}</span> - <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> <span class="o">/</span> <span class="n">count</span><span class="o">));</span> + <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> <span class="o">/</span> <span class="n">count</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> <span class="o">}</span></code></pre></div> @@ -3300,7 +3447,7 @@ every 30 seconds.</p> <div data-lang="java7"> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Double</span> <span class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span class="mi">1000</span><span class="o">.;</span> -<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">StockPrice</span> <span class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="n">DEFAULT_PRICE</span><span class="o">);</span> +<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">StockPrice</span> <span class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StockPrice</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="n">DEFAULT_PRICE</span><span class="o">);</span> <span class="c1">//Use delta policy to create price change warnings</span> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">priceWarnings</span> <span class="o">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span> @@ -3310,13 +3457,13 @@ every 30 seconds.</p> <span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">abs</span><span class="o">(</span><span class="n">oldDataPoint</span><span class="o">.</span><span class="na">price</span> <span class="o">-</span> <span class="n">newDataPoint</span><span class="o">.</span><span class="na">price</span><span class="o">);</span> <span class="o">}</span> <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span class="o">))</span> -<span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">SendWarning</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> +<span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="n">SendWarning</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span> <span class="c1">//Count the number of warnings every half a minute</span> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">&gt;</span> <span class="n">warningsPerStock</span> <span class="o">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> <span class="o">}</span> <span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">&quot;count&quo t;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> @@ -3398,7 +3545,7 @@ but for the sake of this example we generate dummy tweet data.</p> <div data-lang="java7"> <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Read a stream of tweets</span> -<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">tweetStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TweetSource</span><span class="o">());</span> +<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">tweetStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="n">TweetSource</span><span class="o">());</span> <span class="c1">//Extract the stock symbols</span> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">mentionedSymbols</span> <span class="o">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span> @@ -3421,7 +3568,7 @@ but for the sake of this example we generate dummy tweet data.</p> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">&gt;</span> <span class="n">tweetsPerStock</span> <span class="o">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> <span class="o">}</span> <span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">&quot;count&quo t;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span> @@ -3431,8 +3578,8 @@ but for the sake of this example we generate dummy tweet data.</p> <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> - <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span> - <span class="n">stringBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StringBuilder</span><span class="o">();</span> + <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Random</span><span class="o">();</span> + <span class="n">stringBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StringBuilder</span><span class="o">();</span> <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> <span class="n">stringBuilder</span><span class="o">.</span><span class="na">setLength</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> @@ -3514,7 +3661,7 @@ these data streams are potentially infinite, we apply the join on a <span class="c1">//Compute rolling correlation</span> <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">rollingCorrelation</span> <span class="o">=</span> <span class="n">tweetsAndWarning</span> <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span> - <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowCorrelation</span><span class="o">());</span> + <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="n">WindowCorrelation</span><span class="o">());</span> <span class="n">rollingCorrelation</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> @@ -3836,25 +3983,25 @@ Flink serialization system improved a lot over time and by now surpasses the cap <span class="c1">// Setup Hadoopâs TextInputFormat</span> <span class="n">HadoopInputFormat</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">&gt;</span> <span class="n">hadoopInputFormat</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HadoopInputFormat</span><span class="o">&lt;</span><span class="n">LongWritable</span><span class="o">,</span> <span class="n">Text</span><span class="o">&gt;(</span> - <span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="k">new</span> <span class="nf">JobConf</span><span class="o">());</span> - <span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">hadoopInputFormat</span><span class="o">.</span><span class="na">getJobConf</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">inputPath</span><span class="o">));</span> + <span class="k">new</span> <span class="n">TextInputFormat</span><span class="o">(),</span> <span class="n">LongWritable</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Text</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="k">new</span> <span class="n">JobConf</span><span class="o">());</span> + <span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">hadoopInputFormat</span><span class="o">.</span><span class="na">getJobConf</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Path</span><span class="o">(</span><span class="n">inputPath</span><span class="o">));</span> <span class="c1">// Read a DataSet with the Hadoop In
<TRUNCATED>