Modified: samza/site/startup/preview/index.html URL: http://svn.apache.org/viewvc/samza/site/startup/preview/index.html?rev=1832281&r1=1832280&r2=1832281&view=diff ============================================================================== --- samza/site/startup/preview/index.html (original) +++ samza/site/startup/preview/index.html Fri May 25 22:28:38 2018 @@ -185,27 +185,27 @@ <p>There are four layers in the architecture. The following sections describe each of the layers.</p> -<h4 id="i.-high-level-api">I. High Level API</h4> +<h4 id="i-high-level-api">I. High Level API</h4> <p>The high level API provides the libraries to define your application logic. The <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a> is the central abstraction which your application must implement. You start by declaring your inputs as instances of <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html">MessageStream</a>. Then you can apply operators on each MessageStream like map, filter, window, and join to define the whole end-to-end data processing in a single program.</p> <p>For a deeper dive into the high level API, see <a href="#high-level-api">high level API section</a> below.</p> -<h4 id="ii.-applicationrunner">II. ApplicationRunner</h4> +<h4 id="ii-applicationrunner">II. ApplicationRunner</h4> <p>Samza uses an <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html">ApplicationRunner</a> to run a stream application. The ApplicationRunner generates the configs (such as input/output streams), creates intermediate streams, and starts the execution. There are two types of ApplicationRunner:</p> <p><strong>RemoteApplicationRunner</strong> - submits the application to a remote cluster. This runner is invoked via the <em>run-app.sh</em> script. To use RemoteApplicationRunner, set the following configurations</p> -<div class="highlight"><pre><code class="jproperties"><span class="c"># The StreamApplication class to run</span> +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># The StreamApplication class to run</span> <span class="na">app.class</span><span class="o">=</span><span class="s">com.company.job.YourStreamApplication</span> -<span class="na">job.factory.class</span><span class="o">=</span><span class="s">org.apache.samza.job.yarn.YarnJobFactory</span></code></pre></div> +<span class="na">job.factory.class</span><span class="o">=</span><span class="s">org.apache.samza.job.yarn.YarnJobFactory</span></code></pre></figure> <p>Then use <em>run-app.sh</em> to run the application in the remote cluster. The script will invoke the RemoteApplicationRunner, which will launch one or more jobs using the factory specified with <em>job.factory.class</em>. Follow the <a href="/learn/tutorials/latest/hello-samza-high-level-yarn.html">yarn deployment tutorial</a> to try it out.</p> <p><strong>LocalApplicationRunner</strong> - runs the application in the JVM process of the runner. For example, to launch your application on multiple machines using ZooKeeper for coordination, you can run multiple instances of LocalApplicationRunner on various machines. After the applications load they will start cordinatinating their actions through ZooKeeper. Here is an example to run the StreamApplication in your program using the LocalApplicationRunner:</p> -<div class="highlight"><pre><code class="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> <span class="n">CommandLine</span> <span class="n">cmdLine</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CommandLine</span><span class="o">();</span> <span class="n">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="n">cmdLine</span><span class="o">.</span><span class="na">loadConfig</span><span class="o">(</span><span class="n">cmdLine</span><span class="o">.</span><span class="na">parser</span><span class="o">().</span><span class="na">parse</span><span class="o">(</span><span class="n">args</span><span class="o">));</span> <span class="n">LocalApplicationRunner</span> <span class="n">localRunner</span> <span class="o">=</span> <span class="k">new</span> <span class="n">LocalApplicationRunner</span><span class="o">(</span><span class="n">config</span><span class="o">);</span> @@ -215,7 +215,7 @@ <span class="c1">// Wait for the application to finish</span> <span class="n">localRunner</span><span class="o">.</span><span class="na">waitForFinish</span><span class="o">();</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Application completed with status "</span> <span class="o">+</span> <span class="n">localRunner</span><span class="o">.</span><span class="na">status</span><span class="o">(</span><span class="n">app</span><span class="o">));</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> <p>Follow the <a href="/learn/tutorials/latest/hello-samza-high-level-zk.html">ZooKeeper deployment tutorial</a> to try it out.</p> @@ -232,7 +232,7 @@ <p><img src="/img/latest/learn/documentation/introduction/execution-plan.png" alt="Execution plan" style="max-width: 100%; height: auto;" onclick="window.open(this.src)"/></p> -<h4 id="iii.-execution-models">III. Execution Models</h4> +<h4 id="iii-execution-models">III. Execution Models</h4> <p>Samza supports two types of execution models: cluster based execution and embedded execution.</p> @@ -247,7 +247,7 @@ <p>For more details on running Samza in embedded mode, take a look at the <a href="#flexible-deployment-model">flexible deployment model</a> section below.</p> -<h4 id="iv.-processor">IV. Processor</h4> +<h4 id="iv-processor">IV. Processor</h4> <p>The lowest execution unit of a Samza application is the processor. It reads the configs generated from the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html">ApplicationRunner</a> and processes the input stream partitions assigned by the JobCoordinator. It can access local state using a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/storage/kv/KeyValueStore.html">KeyValueStore</a> implementation (e.g. RocksDB or in-memory) and remote state (e.g. REST service) using multithreading.</p> @@ -274,11 +274,11 @@ <p>When writing your stream processing application using the Samza high-level API, you should implement a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a> and define your processing logic in the init method.</p> -<div class="highlight"><pre><code class="java"><span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">StreamGraph</span> <span class="n">graph</span><span class="o">,</span> <span class="n">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> <span class="err">â¦</span> <span class="o">}</span></code></pre></div> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">StreamGraph</span> <span class="n">graph</span><span class="o">,</span> <span class="n">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> <span class="err">â¦</span> <span class="o">}</span></code></pre></figure> <p>For example, here is a StreamApplication that validates and decorates page views with viewer’s profile information.</p> -<div class="highlight"><pre><code class="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span class="n">StreamApplication</span> <span class="o">{</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span class="n">StreamApplication</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="n">StreamGraph</span> <span class="n">graph</span><span class="o">,</span> <span class="n">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="err">â</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span> @@ -287,7 +287,7 @@ <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">addProfileInformation</span><span class="o">)</span> <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="err">â</span><span class="n">decorated</span><span class="o">-</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">)))</span> <span class="o">}</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> <h3 id="messagestream">MessageStream</h3> @@ -302,43 +302,44 @@ <p>There are 3 simple steps to write your stream processing applications using the Samza high-level API.</p> -<h3 id="step-1:-obtain-the-input-streams:">Step 1: Obtain the input streams:</h3> +<h3 id="step-1-obtain-the-input-streams">Step 1: Obtain the input streams:</h3> <p>You can obtain the MessageStream for your input stream ID (âpage-viewsâ) using StreamGraph.getInputStream.</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="err">â</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></div> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="err">â</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span> + </code></pre></figure> <p>The first parameter <code>page-views</code> is the logical stream ID. Each stream ID is associated with a <em>physical name</em> and a <em>system</em>. By default, Samza uses the stream ID as the physical stream name and accesses the stream on the default system which is specified with the property âjob.default.systemâ. However, the <em>physical name</em> and <em>system</em> properties can be overridden in configuration. For example, the following configuration defines the stream ID “page-views” as an alias for the PageViewEvent topic in a local Kafka cluster.</p> -<div class="highlight"><pre><code class="jproperties"><span class="na">streams.page-views.samza.system</span><span class="o">=</span><span class="s">kafka</span> +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">streams.page-views.samza.system</span><span class="o">=</span><span class="s">kafka</span> <span class="na">systems.kafka.samza.factory</span><span class="o">=</span><span class="s">org.apache.samza.system.kafka.KafkaSystemFactory</span> <span class="na">systems.kafka.consumer.zookeeper.connect</span><span class="o">=</span><span class="s">localhost:2181</span> <span class="na">systems.kafka.producer.bootstrap.servers</span><span class="o">=</span><span class="s">localhost:9092</span> -<span class="na">streams.page-views.samza.physical.name</span><span class="o">=</span><span class="s">PageViewEvent</span></code></pre></div> +<span class="na">streams.page-views.samza.physical.name</span><span class="o">=</span><span class="s">PageViewEvent</span></code></pre></figure> <p>The second parameter is a serde to de-serialize the incoming message.</p> -<h3 id="step-2:-define-your-transformation-logic:">Step 2: Define your transformation logic:</h3> +<h3 id="step-2-define-your-transformation-logic">Step 2: Define your transformation logic:</h3> <p>You are now ready to define your StreamApplication logic as a series of transformations on MessageStreams.</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">DecoratedPageViews</span><span class="o">></span> <span class="n">decoratedPageViews</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o"><</span><span class="n">DecoratedPageViews</span><span class="o">></span> <span class="n">decoratedPageViews</span> <span class="o">=</span> <span class="n">pageViewInput</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">isValidPageView</span><span class="o">)</span> - <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">addProfileInformation</span><span class="o">);</span></code></pre></div> + <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">addProfileInformation</span><span class="o">);</span></code></pre></figure> -<h3 id="step-3:-write-the-output-to-an-output-stream:">Step 3: Write the output to an output stream:</h3> +<h3 id="step-3-write-the-output-to-an-output-stream">Step 3: Write the output to an output stream:</h3> <p>Finally, you can create an OutputStream using StreamGraph.getOutputStream and send the transformed messages through it.</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Send messages with userId as the key to âdecorated-page-viewsâ.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Send messages with userId as the key to âdecorated-page-viewsâ.</span> <span class="n">decoratedPageViews</span><span class="o">.</span><span class="na">sendTo</span><span class="o">(</span> <span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="err">â</span><span class="n">decorated</span><span class="o">-</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">â</span><span class="o">,</span> - <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span></code></pre></div> + <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span></code></pre></figure> <p>The first parameter <code>decorated-page-views</code> is a logical stream ID. The properties for this stream ID can be overridden just like the stream IDs for input streams. For example:</p> -<div class="highlight"><pre><code class="jproperties"><span class="na">streams.decorated-page-views.samza.system</span><span class="o">=</span><span class="s">kafka</span> -<span class="na">streams.decorated-page-views.samza.physical.name</span><span class="o">=</span><span class="s">DecoratedPageViewEvent</span></code></pre></div> +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">streams.decorated-page-views.samza.system</span><span class="o">=</span><span class="s">kafka</span> +<span class="na">streams.decorated-page-views.samza.physical.name</span><span class="o">=</span><span class="s">DecoratedPageViewEvent</span></code></pre></figure> <p>The second parameter is a serde to de-serialize the outgoing message.</p> @@ -350,34 +351,34 @@ <p>Applies the provided 1:1 <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/MapFunction.html">MapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">numbers</span> <span class="o">=</span> <span class="o">...</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">numbers</span> <span class="o">=</span> <span class="o">...</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">tripled</span><span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-></span> <span class="n">m</span> <span class="o">*</span> <span class="mi">3</span><span class="o">)</span> -<span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">stringified</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-></span> <span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">m</span><span class="o">))</span></code></pre></div> +<span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">stringified</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-></span> <span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">m</span><span class="o">))</span></code></pre></figure> <h3 id="flatmap">Flatmap</h3> <p>Applies the provided 1:n <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FlatMapFunction.html">FlatMapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">sentence</span> <span class="o">=</span> <span class="o">...</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">sentence</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Parse the sentence into its individual words splitting by space</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentence</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">sentence</span> <span class="o">-></span> - <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">sentence</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="err">â</span> <span class="err">â</span><span class="o">))</span></code></pre></div> + <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">sentence</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="err">â</span> <span class="err">â</span><span class="o">))</span></code></pre></figure> <h3 id="filter">Filter</h3> <p>Applies the provided <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FilterFunction.html">FilterFunction</a> to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Extract only the long words</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">longWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-></span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">></span> <span class="mi">15</span><span class="o">);</span> <span class="c1">// Extract only the short words</span> -<span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">shortWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-></span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o"><</span> <span class="mi">3</span><span class="o">);</span></code></pre></div> +<span class="n">MessageStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">shortWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-></span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o"><</span> <span class="mi">3</span><span class="o">);</span></code></pre></figure> <h3 id="partitionby">PartitionBy</h3> <p>Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Repartition pageView by userId.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">>></span> <span class="n">partitionedPageViews</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-></span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key extractor</span> @@ -385,41 +386,41 @@ <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)),</span> <span class="c1">// serdes</span> <span class="s">"partitioned-page-views"</span><span class="o">)</span> <span class="c1">// operator ID</span> -<span class="n">The</span> <span class="n">operator</span> <span class="n">ID</span> <span class="n">should</span> <span class="n">be</span> <span class="n">unique</span> <span class="k">for</span> <span class="n">an</span> <span class="n">operator</span> <span class="n">within</span> <span class="n">the</span> <span class="n">application</span> <span class="n">and</span> <span class="n">is</span> <span class="n">used</span> <span class="n">to</span> <span class="n">identify</span> <span class="n">the</span> <span class="n">streams</span> <span class="n">and</span> <span class="n">stores</span> <span class="n">created</span> <span class="n">by</span> <span class="n">the</span> <span class="n">operator</span><span class="o">.</span></code></pre></div> +<span class="n">The</span> <span class="n">operator</span> <span class="n">ID</span> <span class="n">should</span> <span class="n">be</span> <span class="n">unique</span> <span class="k">for</span> <span class="n">an</span> <span class="n">operator</span> <span class="n">within</span> <span class="n">the</span> <span class="n">application</span> <span class="n">and</span> <span class="n">is</span> <span class="n">used</span> <span class="n">to</span> <span class="n">identify</span> <span class="n">the</span> <span class="n">streams</span> <span class="n">and</span> <span class="n">stores</span> <span class="n">created</span> <span class="n">by</span> <span class="n">the</span> <span class="n">operator</span><span class="o">.</span></code></pre></figure> <h3 id="merge">Merge</h3> <p>Merges the MessageStream with all the provided MessageStreams and returns the merged stream.</p> -<div class="highlight"><pre><code class="java"><span class="n">MessageStream</span><span class="o"><</span><span class="n">ServiceCall</span><span class="o">></span> <span class="n">serviceCall1</span> <span class="o">=</span> <span class="o">...</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="n">MessageStream</span><span class="o"><</span><span class="n">ServiceCall</span><span class="o">></span> <span class="n">serviceCall1</span> <span class="o">=</span> <span class="o">...</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">ServiceCall</span><span class="o">></span> <span class="n">serviceCall2</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Merge individual âServiceCallâ streams and create a new merged MessageStream</span> -<span class="n">MessageStream</span><span class="o"><</span><span class="n">ServiceCall</span><span class="o">></span> <span class="n">serviceCallMerged</span> <span class="o">=</span> <span class="n">serviceCall1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">serviceCall2</span><span class="o">)</span></code></pre></div> +<span class="n">MessageStream</span><span class="o"><</span><span class="n">ServiceCall</span><span class="o">></span> <span class="n">serviceCallMerged</span> <span class="o">=</span> <span class="n">serviceCall1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">serviceCall2</span><span class="o">)</span></code></pre></figure> <p>The merge transform preserves the order of each MessageStream, so if message <code>m1</code> appears before <code>m2</code> in any provided stream, then, <code>m1</code> also appears before <code>m2</code> in the merged stream.</p> <p>As an alternative to the <code>merge</code> instance method, you also can use the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">MessageStream#mergeAll</a> static method to merge MessageStreams without operating on an initial stream.</p> -<h3 id="sendto-(stream)">SendTo (stream)</h3> +<h3 id="sendto-stream">SendTo (stream)</h3> <p>Sends all messages from this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing message.</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Output a new message with userId as the key and region as the value to the âuser-regionâ stream.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Output a new message with userId as the key and region as the value to the âuser-regionâ stream.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">PageView</span><span class="o">>></span> <span class="n">keyedPageViews</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getRegion</span><span class="o">()));</span> <span class="n">OutputStream</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">userRegions</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="err">â</span><span class="n">user</span><span class="o">-</span><span class="n">region</span><span class="err">â</span><span class="o">,</span> <span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">StringSerde</span><span class="o">()));</span> -<span class="n">keyedPageViews</span><span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">userRegions</span><span class="o">);</span></code></pre></div> +<span class="n">keyedPageViews</span><span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">userRegions</span><span class="o">);</span></code></pre></figure> -<h3 id="sendto-(table)">SendTo (table)</h3> +<h3 id="sendto-table">SendTo (table)</h3> <p>Sends all messages from this MessageStream to the provided table, the expected message type is KV.</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Write a new message with memberId as the key and profile as the value to a table.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="c1">// Write a new message with memberId as the key and profile as the value to a table.</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"Profile"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="n">Profile</span><span class="o">>())</span> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-></span> <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">m</span><span class="o">.</span><span class="na">getMemberId</span><span class="o">(),</span> <span class="n">m</span><span class="o">))</span> - <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">table</span><span class="o">);</span></code></pre></div> + <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">table</span><span class="o">);</span></code></pre></figure> <h3 id="sink">Sink</h3> @@ -427,25 +428,25 @@ <p>This offers more control than sendTo since the SinkFunction has access to the <code>MessageCollector</code> and the <code>TaskCoordinator</code>. For instance, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. remote databases, REST services, etc.)</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Repartition pageView by userId.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Repartition pageView by userId.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span> <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">collector</span><span class="o">,</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> <span class="c1">// Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.</span> <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span class="k">new</span> <span class="n">SystemStream</span><span class="o">(</span><span class="err">â</span><span class="n">kafka</span><span class="err">â</span><span class="o">,</span> <span class="err">â</span><span class="n">TransformedPageViewEvent</span><span class="err">â</span><span class="o">),</span> <span class="n">msg</span><span class="o">));</span> -<span class="o">}</span> <span class="o">)</span></code></pre></div> +<span class="o">}</span> <span class="o">)</span></code></pre></figure> -<h3 id="join-(stream-stream)">Join (stream-stream)</h3> +<h3 id="join-stream-stream">Join (stream-stream)</h3> <p>The stream-stream Join operator joins messages from two MessageStreams using the provided pairwise <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/JoinFunction.html">JoinFunction</a>. Messages are joined when the keys extracted from messages from the first stream match keys extracted from messages in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found.</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.</span> <span class="c1">// Results are produced to a new stream of FulfilledOrderRecord.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">OrderRecord</span><span class="o">></span> <span class="n">orders</span> <span class="o">=</span> <span class="err">â¦</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">ShipmentRecord</span><span class="o">></span> <span class="n">shipments</span> <span class="o">=</span> <span class="err">â¦</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">FulfilledOrderRecord</span><span class="o">></span> <span class="n">shippedOrders</span> <span class="o">=</span> <span class="n">orders</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">shipments</span><span class="o">,</span> <span class="k">new</span> <span class="n">OrderShipmentJoiner</span><span class="o">(),</span> - <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="c1">// serde for the join key</span> + <span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="c1">// serde for the join key</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">OrderRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">ShipmentRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="c1">// serde for both streams</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">20</span><span class="o">),</span> <span class="c1">// join TTL</span> <span class="s">"shipped-order-stream"</span><span class="o">)</span> <span class="c1">// operator ID</span> @@ -454,7 +455,7 @@ <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> <span class="kd">implements</span> <span class="n">JoinFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">OrderRecord</span><span class="o">,</span> <span class="n">ShipmentRecord</span><span class="o">,</span> <span class="n">FulfilledOrderRecord</span><span class="o">></span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">FulfilledOrderRecord</span> <span class="nf">apply</span><span class="o">(</span><span class="n">OrderRecord</span> <span class="n">message</span><span class="o">,</span> <span class="n">ShipmentRecord</span> <span class="n">otherMessage</span><span class="o">)</span> <span class="o">{</span> - <span class="k">return</span> <span class="k">new</span> <span class="nf">FulfilledOrderRecord</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">orderTimestamp</span><span class="o">,</span> <span class="n">otherMessage</span><span class="o">.</span><span class="na">shipTimestamp</span><span class="o">);</span> + <span class="k">return</span> <span class="k">new</span> <span class="n">FulfilledOrderRecord</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">orderTimestamp</span><span class="o">,</span> <span class="n">otherMessage</span><span class="o">.</span><span class="na">shipTimestamp</span><span class="o">);</span> <span class="o">}</span> <span class="nd">@Override</span> @@ -466,36 +467,36 @@ <span class="kd">public</span> <span class="n">String</span> <span class="nf">getSecondKey</span><span class="o">(</span><span class="n">ShipmentRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">;</span> <span class="o">}</span> - <span class="o">}</span></code></pre></div> + <span class="o">}</span></code></pre></figure> -<h3 id="join-(stream-table)">Join (stream-table)</h3> +<h3 id="join-stream-table">Join (stream-table)</h3> <p>The stream-table Join operator joins messages from a MessageStream using the provided <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html">StreamTableJoinFunction</a>. Messages from the input stream are joined with record in table using key extracted from input messages. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided; the join function can choose to return null (inner join) or an output message (left outer join). For join to function properly, it is important to ensure the input stream and table are partitioned using the same key as this impacts the physical placement of data. </p> -<div class="highlight"><pre><code class="java"><span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"PageView"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="n">PageView</span><span class="o">>())</span> - <span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="nl">PageView:</span><span class="o">:</span><span class="n">getMemberId</span><span class="o">,</span> <span class="n">v</span> <span class="o">-></span> <span class="n">v</span><span class="o">,</span> <span class="s">"p1"</span><span class="o">)</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"PageView"</span><span class="o">,</span> <span class="k">new</span> <span class="n">NoOpSerde</span><span class="o"><</span><span class="n">PageView</span><span class="o">>())</span> + <span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="n">PageView</span><span class="o">::</span><span class="n">getMemberId</span><span class="o">,</span> <span class="n">v</span> <span class="o">-></span> <span class="n">v</span><span class="o">,</span> <span class="s">"p1"</span><span class="o">)</span> <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">table</span><span class="o">,</span> <span class="k">new</span> <span class="n">PageViewToProfileJoinFunction</span><span class="o">())</span> - <span class="o">...</span></code></pre></div> + <span class="o">...</span></code></pre></figure> -<div class="highlight"><pre><code class="java"><span class="lineno"> 1</span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewToProfileJoinFunction</span> <span class="kd">implements</span> <span class="n">StreamTableJoinFunction</span> -<span class="lineno"> 2</span> <span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">>,</span> <span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">>,</span> <span class="n">EnrichedPageView</span><span class="o">></span> <span class="o">{</span> -<span class="lineno"> 3</span> <span class="nd">@Override</span> -<span class="lineno"> 4</span> <span class="kd">public</span> <span class="n">EnrichedPageView</span> <span class="nf">apply</span><span class="o">(</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">></span> <span class="n">m</span><span class="o">,</span> <span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">></span> <span class="n">r</span><span class="o">)</span> <span class="o">{</span> -<span class="lineno"> 5</span> <span class="k">return</span> <span class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> -<span class="lineno"> 6</span> <span class="k">new</span> <span class="nf">EnrichedPageView</span><span class="o">(...)</span> -<span class="lineno"> 7</span> <span class="o">:</span> <span class="kc">null</span><span class="o">;</span> -<span class="lineno"> 8</span> <span class="o">}</span> -<span class="lineno"> 9</span> -<span class="lineno">10</span> <span class="nd">@Override</span> -<span class="lineno">11</span> <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">getMessageKey</span><span class="o">(</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">></span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> -<span class="lineno">12</span> <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> -<span class="lineno">13</span> <span class="o">}</span> -<span class="lineno">14</span> -<span class="lineno">15</span> <span class="nd">@Override</span> -<span class="lineno">16</span> <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">getRecordKey</span><span class="o">(</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">></span> <span class="n">record</span><span class="o">)</span> <span class="o">{</span> -<span class="lineno">17</span> <span class="k">return</span> <span class="n">record</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> -<span class="lineno">18</span> <span class="o">}</span> -<span class="lineno">19</span> <span class="o">}</span></code></pre></div> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="lineno"> 1 </span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewToProfileJoinFunction</span> <span class="kd">implements</span> <span class="n">StreamTableJoinFunction</span> +<span class="lineno"> 2 </span> <span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">>,</span> <span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">>,</span> <span class="n">EnrichedPageView</span><span class="o">></span> <span class="o">{</span> +<span class="lineno"> 3 </span> <span class="nd">@Override</span> +<span class="lineno"> 4 </span> <span class="kd">public</span> <span class="n">EnrichedPageView</span> <span class="nf">apply</span><span class="o">(</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">></span> <span class="n">m</span><span class="o">,</span> <span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">></span> <span class="n">r</span><span class="o">)</span> <span class="o">{</span> +<span class="lineno"> 5 </span> <span class="k">return</span> <span class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> +<span class="lineno"> 6 </span> <span class="k">new</span> <span class="n">EnrichedPageView</span><span class="o">(...)</span> +<span class="lineno"> 7 </span> <span class="o">:</span> <span class="kc">null</span><span class="o">;</span> +<span class="lineno"> 8 </span> <span class="o">}</span> +<span class="lineno"> 9 </span> +<span class="lineno">10 </span> <span class="nd">@Override</span> +<span class="lineno">11 </span> <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">getMessageKey</span><span class="o">(</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">PageView</span><span class="o">></span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> +<span class="lineno">12 </span> <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> +<span class="lineno">13 </span> <span class="o">}</span> +<span class="lineno">14 </span> +<span class="lineno">15 </span> <span class="nd">@Override</span> +<span class="lineno">16 </span> <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">getRecordKey</span><span class="o">(</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">></span> <span class="n">record</span><span class="o">)</span> <span class="o">{</span> +<span class="lineno">17 </span> <span class="k">return</span> <span class="n">record</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> +<span class="lineno">18 </span> <span class="o">}</span> +<span class="lineno">19 </span><span class="o">}</span></code></pre></figure> <h3 id="window">Window</h3> @@ -513,7 +514,7 @@ <p>An <em>accumulating window</em> retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.</p> -<h4 id="window-types:">Window Types:</h4> +<h4 id="window-types">Window Types:</h4> <p>The Samza high-level API currently supports tumbling and session windows.</p> @@ -521,13 +522,13 @@ <p>Examples:</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Group the pageView stream into 3 second tumbling windows keyed by the userId.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Group the pageView stream into 3 second tumbling windows keyed by the userId.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">WindowPane</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Collection</span><span class="o"><</span><span class="n">PageView</span><span class="o">>>></span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span> <span class="n">Windows</span><span class="o">.</span><span class="na">keyedTumblingWindow</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-></span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key extractor</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="c1">// window duration</span> - <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span> + <span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">JsonSerdeV2</span><span class="o"><>(</span><span class="n">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span> <span class="c1">// Compute the maximum value over tumbling windows of 3 seconds.</span> @@ -535,20 +536,20 @@ <span class="n">Supplier</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-></span> <span class="n">Integer</span><span class="o">.</span><span class="na">MIN_VALUE</span><span class="o">;</span> <span class="n">FoldLeftFunction</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">aggregateFunction</span> <span class="o">=</span> <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">);</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">WindowPane</span><span class="o"><</span><span class="n">Void</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">windowedStream</span> <span class="o">=</span> - <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">aggregateFunction</span><span class="o">,</span> <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">()));</span></code></pre></div> + <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">aggregateFunction</span><span class="o">,</span> <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">()));</span></code></pre></figure> <p><strong>Session Window</strong>: A session window groups a MessageStream into sessions. A session captures a period of activity over a MessageStream and is defined by a gap. A session is closed and results are emitted if no new messages arrive for the window for the gap duration.</p> <p>Examples:</p> -<div class="highlight"><pre><code class="java"><span class="c1">// Sessionize a stream of page views, and count the number of page-views in a session for every user.</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Sessionize a stream of page views, and count the number of page-views in a session for every user.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">PageView</span><span class="o">></span> <span class="n">pageViews</span> <span class="o">=</span> <span class="err">â¦</span> <span class="n">Supplier</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-></span> <span class="mi">0</span> <span class="n">FoldLeftFunction</span><span class="o"><</span><span class="n">PageView</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">countAggregator</span> <span class="o">=</span> <span class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span class="n">oldCount</span><span class="o">)</span> <span class="o">-></span> <span class="n">oldCount</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> <span class="n">Duration</span> <span class="n">sessionGap</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">WindowPane</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">sessionCounts</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">keyedSessionWindow</span><span class="o">(</span> <span class="n">pageView</span> <span class="o">-></span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="n">sessionGap</span><span class="o">,</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">countAggregator</span><span class="o">,</span> - <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">()));</span> + <span class="k">new</span> <span class="n">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">()));</span> <span class="c1">// Compute the maximum value over tumbling windows of 3 seconds.</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">integers</span> <span class="o">=</span> <span class="err">â¦</span> @@ -557,7 +558,7 @@ <span class="n">FoldLeftFunction</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">aggregateFunction</span> <span class="o">=</span> <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">-></span> <span class="n">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="n">MessageStream</span><span class="o"><</span><span class="n">WindowPane</span><span class="o"><</span><span class="n">Void</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">3</span><span class="o">),</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">aggregateFunction</span><span class="o">,</span> - <span class="k">new</span> <span class="nf">IntegerSerde</span><span class="o">()))</span></code></pre></div> + <span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">()))</span></code></pre></figure> <h3 id="table">Table</h3> @@ -569,11 +570,11 @@ <li>Join a stream with the table using the join() operator</li> </ol> -<div class="highlight"><pre><code class="java"><span class="lineno">1</span> <span class="kd">final</span> <span class="n">StreamApplication</span> <span class="n">app</span> <span class="o">=</span> <span class="o">(</span><span class="n">streamGraph</span><span class="o">,</span> <span class="n">cfg</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> -<span class="lineno">2</span> <span class="n">Table</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">>></span> <span class="n">table</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getTable</span><span class="o">(</span><span class="k">new</span> <span class="n">InMemoryTableDescriptor</span><span class="o">(</span><span class="s">"t1"</span><span class="o">)</span> -<span class="lineno">3</span> <span class="o">.</span><span class="na">withSerde</span><span class="o">(</span><span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ProfileJsonSerde</span><span class="o">())));</span> -<span class="lineno">4</span> <span class="o">...</span> -<span class="lineno">5</span> <span class="o">};</span></code></pre></div> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="lineno">1 </span><span class="kd">final</span> <span class="n">StreamApplication</span> <span class="n">app</span> <span class="o">=</span> <span class="o">(</span><span class="n">streamGraph</span><span class="o">,</span> <span class="n">cfg</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> +<span class="lineno">2 </span> <span class="n">Table</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Profile</span><span class="o">>></span> <span class="n">table</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getTable</span><span class="o">(</span><span class="k">new</span> <span class="n">InMemoryTableDescriptor</span><span class="o">(</span><span class="s">"t1"</span><span class="o">)</span> +<span class="lineno">3 </span> <span class="o">.</span><span class="na">withSerde</span><span class="o">(</span><span class="n">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">IntegerSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="n">ProfileJsonSerde</span><span class="o">())));</span> +<span class="lineno">4 </span> <span class="o">...</span> +<span class="lineno">5 </span><span class="o">};</span></code></pre></figure> <p>Example above creates a TableDescriptor object, which contains all information about a table. The currently supported table types are <a href="https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java">InMemoryTableDescriptor</a> and <a href="https://github.com/apache/samza/blob/master/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/rocksdb/RocksDbTableDescriptor.java">RocksDbTableDescriptor</a>. Notice the type of records in a table is KV, and <a href="https://samza.apache.org/learn/documentation/latest/container/serialization.html">Serdes</a> for both key and value of records needs to be defined (line 4). Additional parameters can be added based on individual table types. </p> @@ -671,20 +672,20 @@ There is one leader processor which gene <p>To use ZooKeeper-based coordination, the following configs are required:</p> -<div class="highlight"><pre><code class="jproperties"><span class="na">job.coordinator.factory</span><span class="o">=</span><span class="s">org.apache.samza.zk.ZkJobCoordinatorFactory</span> +<figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="na">job.coordinator.factory</span><span class="o">=</span><span class="s">org.apache.samza.zk.ZkJobCoordinatorFactory</span> <span class="na">job.coordinator.zk.connect</span><span class="o">=</span><span class="s">yourzkconnection</span> -<span class="na">task.name.grouper.factory</span><span class="o">=</span><span class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></div> +<span class="na">task.name.grouper.factory</span><span class="o">=</span><span class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></figure> <p>To use external coordination, the following configs are needed:</p> -<div class="highlight"><pre><code class="properties"><span class="na">job.coordinator.factory</span><span class="o">=</span><span class="s">org.apache.samza.standalone.PassthroughJobCoordinatorFactory</span> -<span class="na">task.name.grouper.factory</span><span class="o">=</span><span class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></div> +<figure class="highlight"><pre><code class="language-properties" data-lang="properties"><span></span><span class="na">job.coordinator.factory</span><span class="o">=</span><span class="s">org.apache.samza.standalone.PassthroughJobCoordinatorFactory</span> +<span class="na">task.name.grouper.factory</span><span class="o">=</span><span class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></figure> <h4 id="api">API</h4> <p>As mentioned in the <a href="#architecture">architecture</a> section above, you use the LocalApplicationRunner to launch your processors from your application code, like this:</p> -<div class="highlight"><pre><code class="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WikipediaZkLocalApplication</span> <span class="o">{</span> +<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WikipediaZkLocalApplication</span> <span class="o">{</span> <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span> <span class="n">CommandLine</span> <span class="n">cmdLine</span> <span class="o">=</span> <span class="k">new</span> <span class="n">CommandLine</span><span class="o">();</span> @@ -697,7 +698,7 @@ There is one leader processor which gene <span class="n">runner</span><span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">app</span><span class="o">);</span> <span class="n">runner</span><span class="o">.</span><span class="na">waitForFinish</span><span class="o">();</span> <span class="o">}</span> -<span class="o">}</span></code></pre></div> +<span class="o">}</span></code></pre></figure> <p>In the code above, <code>WikipediaApplication</code> is an application written with the <a href="#high-level-api">high level API</a>.</p>
Modified: samza/site/startup/releases/latest/release-notes.html URL: http://svn.apache.org/viewvc/samza/site/startup/releases/latest/release-notes.html?rev=1832281&r1=1832280&r2=1832281&view=diff ============================================================================== --- samza/site/startup/releases/latest/release-notes.html (original) +++ samza/site/startup/releases/latest/release-notes.html Fri May 25 22:28:38 2018 @@ -44,9 +44,7 @@ <!-- this icon only shows in versioned pages --> - - - <a href="http://samza.apache.org/startup/releases/0.14/release-notes.html"><i id="switch-version-button"></i></a> + <a href="http://samza.apache.org/startup/releases/0.14/release-notes"><i id="switch-version-button"></i></a> <!-- links for the navigation bar --> @@ -225,7 +223,7 @@ Read more about it in the <a href="/lear <script> $( document ).ready(function() { - if ( $.fn.urlExists( "/startup/releases/0.14/release-notes.html" ) ) { + if ( $.fn.urlExists( "/startup/releases/0.14/release-notes" ) ) { $("#switch-version-button").addClass("fa fa-history masthead-icon"); } });
