http://git-wip-us.apache.org/repos/asf/flink-web/blob/f6ac337f/content/news/2017/04/04/dynamic-tables.html
----------------------------------------------------------------------
diff --git a/content/news/2017/04/04/dynamic-tables.html 
b/content/news/2017/04/04/dynamic-tables.html
new file mode 100644
index 0000000..62f316b
--- /dev/null
+++ b/content/news/2017/04/04/dynamic-tables.html
@@ -0,0 +1,372 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <!-- The above 3 meta tags *must* come first in the head; any other head 
content must come *after* these tags -->
+    <title>Apache Flink: Continuous Queries on Dynamic Tables</title>
+    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+    <!-- Bootstrap -->
+    <link rel="stylesheet" 
href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css";>
+    <link rel="stylesheet" href="/css/flink.css">
+    <link rel="stylesheet" href="/css/syntax.css">
+
+    <!-- Blog RSS feed -->
+    <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" 
title="Apache Flink Blog: RSS feed" />
+
+    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
+    <!-- We need to load Jquery in the header for custom google analytics 
event tracking-->
+    <script 
src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js";></script>
+
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media 
queries -->
+    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
+    <!--[if lt IE 9]>
+      <script 
src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js";></script>
+      <script 
src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js";></script>
+    <![endif]-->
+  </head>
+  <body>  
+    
+
+    <!-- Main content. -->
+    <div class="container">
+    <div class="row">
+
+      
+     <div id="sidebar" class="col-sm-3">
+          <!-- Top navbar. -->
+    <nav class="navbar navbar-default">
+        <!-- The logo. -->
+        <div class="navbar-header">
+          <button type="button" class="navbar-toggle collapsed" 
data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+          </button>
+          <div class="navbar-logo">
+            <a href="/">
+              <img alt="Apache Flink" src="/img/flink-header-logo.svg" 
width="147px" height="73px">
+            </a>
+          </div>
+        </div><!-- /.navbar-header -->
+
+        <!-- The navigation links. -->
+        <div class="collapse navbar-collapse" 
id="bs-example-navbar-collapse-1">
+          <ul class="nav navbar-nav navbar-main">
+
+            <!-- Downloads -->
+            <li class=""><a class="btn btn-info" 
href="/downloads.html">Download Flink</a></li>
+
+            <!-- Overview -->
+            <li><a href="/index.html">Home</a></li>
+
+            <!-- Intro -->
+            <li><a href="/introduction.html">Introduction to Flink</a></li>
+
+            <!-- Use cases -->
+            <li><a href="/usecases.html">Flink Use Cases</a></li>
+
+            <!-- Powered by -->
+            <li><a href="/poweredby.html">Powered by Flink</a></li>
+
+            <!-- Ecosystem -->
+            <li><a href="/ecosystem.html">Ecosystem</a></li>
+
+            <!-- Community -->
+            <li><a href="/community.html">Community &amp; Project Info</a></li>
+
+            <!-- Contribute -->
+            <li><a href="/how-to-contribute.html">How to Contribute</a></li>
+
+            <!-- Blog -->
+            <li class=" active hidden-md hidden-sm"><a href="/blog/"><b>Flink 
Blog</b></a></li>
+
+            <hr />
+
+
+
+            <!-- Documentation -->
+            <!-- <li>
+              <a 
href="http://ci.apache.org/projects/flink/flink-docs-release-1.2"; 
target="_blank">Documentation <small><span class="glyphicon 
glyphicon-new-window"></span></small></a>
+            </li> -->
+            <li class="dropdown">
+              <a class="dropdown-toggle" data-toggle="dropdown" 
href="#">Documentation
+                <span class="caret"></span></a>
+                <ul class="dropdown-menu">
+                  <li><a 
href="http://ci.apache.org/projects/flink/flink-docs-release-1.2"; 
target="_blank">1.2 (Latest stable release) <small><span class="glyphicon 
glyphicon-new-window"></span></small></a></li>
+                  <li><a 
href="http://ci.apache.org/projects/flink/flink-docs-release-1.3"; 
target="_blank">1.3 (Snapshot) <small><span class="glyphicon 
glyphicon-new-window"></span></small></a></li>
+                </ul>
+              </li>
+
+            <!-- Quickstart -->
+            <li>
+              <a 
href="http://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html";
 target="_blank">Quickstart <small><span class="glyphicon 
glyphicon-new-window"></span></small></a>
+            </li>
+
+            <!-- GitHub -->
+            <li>
+              <a href="https://github.com/apache/flink"; target="_blank">Flink 
on GitHub <small><span class="glyphicon 
glyphicon-new-window"></span></small></a>
+            </li>
+
+          </ul>
+
+
+
+          <ul class="nav navbar-nav navbar-bottom">
+          <hr />
+
+            <!-- FAQ -->
+            <li ><a href="/faq.html">Project FAQ</a></li>
+
+            <!-- Twitter -->
+            <li><a href="https://twitter.com/apacheflink"; 
target="_blank">@ApacheFlink <small><span class="glyphicon 
glyphicon-new-window"></span></small></a></li>
+
+            <!-- Visualizer -->
+            <li class=" hidden-md hidden-sm"><a href="/visualizer/" 
target="_blank">Plan Visualizer <small><span class="glyphicon 
glyphicon-new-window"></span></small></a></li>
+
+          </ul>
+        </div><!-- /.navbar-collapse -->
+    </nav>
+
+      </div>
+      <div class="col-sm-9">
+      <div class="row-fluid">
+  <div class="col-sm-12">
+    <div class="row">
+      <h1>Continuous Queries on Dynamic Tables</h1>
+
+      <article>
+        <p>04 Apr 2017 by Fabian Hueske, Shaoxuan Wang, and Xiaowei Jiang</p>
+
+<h4 id="analyzing-data-streams-with-sql">Analyzing Data Streams with SQL</h4>
+
+<p>More and more companies are adopting stream processing and are migrating 
existing batch applications to streaming or implementing streaming solutions 
for new use cases. Many of those applications focus on analyzing streaming 
data. The data streams that are analyzed come from a wide variety of sources 
such as database transactions, clicks, sensor measurements, or IoT devices.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/streams.png" style="width:45%;margin:10px" 
/>
+</center>
+
+<p>Apache Flink is very well suited to power streaming analytics applications 
because it provides support for event-time semantics, stateful exactly-once 
processing, and achieves high throughput and low latency at the same time. Due 
to these features, Flink is able to compute exact and deterministic results 
from high-volume input streams in near real-time while providing exactly-once 
semantics in case of failures.</p>
+
+<p>Flink’s core API for stream processing, the <a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html";>DataStream
 API</a>, is very expressive and provides primitives for many common 
operations. Among other features, it offers highly customizable windowing 
logic, different state primitives with varying performance characteristics, 
hooks to register and react on timers, and tooling for efficient asynchronous 
requests to external systems. On the other hand, many stream analytics 
applications follow similar patterns and do not require the level of 
expressiveness as provided by the DataStream API. They could be expressed in a 
more natural and concise way using a domain specific language. As we all know, 
SQL is the de-facto standard for data analytics. For streaming analytics, SQL 
would enable a larger pool of people to specify applications on data streams in 
less time. However, no open source stream processor offers decent SQL support 
yet.</p>
+
+<h2 id="why-is-sql-on-streams-a-big-deal">Why is SQL on Streams a Big 
Deal?</h2>
+
+<p>SQL is the most widely used language for data analytics for many good 
reasons:</p>
+
+<ul>
+  <li>SQL is declarative: You specify what you want but not how to compute 
it.</li>
+  <li>SQL can be effectively optimized: An optimizer figures out an efficient 
plan to compute your result.</li>
+  <li>SQL can be efficiently evaluated: The processing engine knows exactly 
what to compute and how to do so efficiently.</li>
+  <li>And finally, everybody knows and many tools speak SQL.</li>
+</ul>
+
+<p>So being able to process and analyze data streams with SQL makes stream 
processing technology available to many more users. Moreover, it significantly 
reduces the time and effort to define efficient stream analytics applications 
due to the SQL’s declarative nature and potential to be automatically 
optimized.</p>
+
+<p>However, SQL (and the relational data model and algebra) were not designed 
with streaming data in mind. Relations are (multi-)sets and not infinite 
sequences of tuples. When executing a SQL query, conventional database systems 
and query engines read and process a data set, which is completely available, 
and produce a fixed sized result. In contrast, data streams continuously 
provide new records such that data arrives over time. Hence, streaming queries 
have to continuously process the arriving data and never “complete”.</p>
+
+<p>That being said, processing streams with SQL is not impossible. Some 
relational database systems feature eager maintenance of materialized views, 
which is similar to evaluating SQL queries on streams of data. A materialized 
view is defined as a SQL query just like a regular (virtual) view. However, the 
result of the query is actually stored (or materialized) in memory or on disk 
such that the view does not need to be computed on-the-fly when it is queried. 
In order to prevent that a materialized view becomes stale, the database system 
needs to update the view whenever its base relations (the tables referenced in 
its definition query) are modified. If we consider the changes on the view’s 
base relations as a stream of modifications (or as a changelog stream) it 
becomes obvious that materialized view maintenance and SQL on streams are 
somehow related.</p>
+
+<h2 id="flinks-relational-apis-table-api-and-sql">Flink’s Relational APIs: 
Table API and SQL</h2>
+
+<p>Since version 1.1.0 (released in August 2016), Flink features two 
semantically equivalent relational APIs, the language-embedded Table API (for 
Java and Scala) and standard SQL. Both APIs are designed as unified APIs for 
online streaming and historic batch data. This means that,</p>
+
+<p><strong><em>a query produces exactly the same result regardless whether its 
input is static batch data or streaming data.</em></strong></p>
+
+<p>Unified APIs for stream and batch processing are important for several 
reasons. First of all, users only need to learn a single API to process static 
and streaming data. Moreover, the same query can be used to analyze batch and 
streaming data, which allows to jointly analyze historic and live data in the 
same query. At the current state we haven’t achieved complete unification of 
batch and streaming semantics yet, but the community is making very good 
progress towards this goal.</p>
+
+<p>The following code snippet shows two equivalent Table API and SQL queries 
that compute a simple windowed aggregate on a stream of temperature sensor 
measurements. The syntax of the SQL query is based on <a 
href="https://calcite.apache.org";>Apache Calcite’s</a> syntax for <a 
href="https://calcite.apache.org/docs/reference.html#grouped-window-functions";>grouped
 window functions</a> and will be supported in version 1.3.0 of Flink.</p>
+
+<div class="highlight"><pre><code class="language-scala"><span 
class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span 
class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span 
class="n">getExecutionEnvironment</span>
+<span class="n">env</span><span class="o">.</span><span 
class="n">setStreamTimeCharacteristic</span><span class="o">(</span><span 
class="nc">TimeCharacteristic</span><span class="o">.</span><span 
class="nc">EventTime</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">tEnv</span> <span 
class="k">=</span> <span class="nc">TableEnvironment</span><span 
class="o">.</span><span class="n">getTableEnvironment</span><span 
class="o">(</span><span class="n">env</span><span class="o">)</span>
+
+<span class="c1">// define a table source to read sensor data (sensorId, time, 
room, temp)</span>
+<span class="k">val</span> <span class="n">sensorTable</span> <span 
class="k">=</span> <span class="o">???</span> <span class="c1">// can be a CSV 
file, Kafka topic, database, or ...</span>
+<span class="c1">// register the table source</span>
+<span class="n">tEnv</span><span class="o">.</span><span 
class="n">registerTableSource</span><span class="o">(</span><span 
class="s">&quot;sensors&quot;</span><span class="o">,</span> <span 
class="n">sensorTable</span><span class="o">)</span>
+
+<span class="c1">// Table API</span>
+<span class="k">val</span> <span class="n">tapiResult</span><span 
class="k">:</span> <span class="kt">Table</span> <span class="o">=</span> <span 
class="n">tEnv</span><span class="o">.</span><span class="n">scan</span><span 
class="o">(</span><span class="s">&quot;sensors&quot;</span><span 
class="o">)</span>   <span class="c1">// scan sensors table</span>
+ <span class="o">.</span><span class="n">window</span><span 
class="o">(</span><span class="nc">Tumble</span> <span class="n">over</span> 
<span class="mf">1.</span><span class="n">hour</span> <span class="n">on</span> 
<span class="-Symbol">&#39;rowtime</span> <span class="n">as</span> <span 
class="-Symbol">&#39;w</span><span class="o">)</span> <span class="c1">// 
define 1-hour window</span>
+ <span class="o">.</span><span class="n">groupBy</span><span 
class="o">(</span><span class="-Symbol">&#39;w</span><span class="o">,</span> 
<span class="-Symbol">&#39;room</span><span class="o">)</span>                  
         <span class="c1">// group by window and room</span>
+ <span class="o">.</span><span class="n">select</span><span 
class="o">(</span><span class="-Symbol">&#39;room</span><span 
class="o">,</span> <span class="-Symbol">&#39;w</span><span 
class="o">.</span><span class="n">end</span><span class="o">,</span> <span 
class="-Symbol">&#39;temp</span><span class="o">.</span><span 
class="n">avg</span> <span class="n">as</span> <span 
class="-Symbol">&#39;avgTemp</span><span class="o">)</span> <span class="c1">// 
compute average temperature</span>
+
+<span class="c1">// SQL</span>
+<span class="k">val</span> <span class="n">sqlResult</span><span 
class="k">:</span> <span class="kt">Table</span> <span class="o">=</span> <span 
class="n">tEnv</span><span class="o">.</span><span class="n">sql</span><span 
class="o">(</span><span class="s">&quot;&quot;&quot;</span>
+<span class="s"> |SELECT room, TUMBLE_END(rowtime, INTERVAL &#39;1&#39; HOUR), 
AVG(temp) AS avgTemp</span>
+<span class="s"> |FROM sensors</span>
+<span class="s"> |GROUP BY TUMBLE(rowtime, INTERVAL &#39;1&#39; HOUR), 
room</span>
+<span class="s"> |&quot;&quot;&quot;</span><span class="o">.</span><span 
class="n">stripMargin</span><span class="o">)</span></code></pre></div>
+
+<p>As you can see, both APIs are tightly integrated with each other and 
Flink’s primary <a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html";>DataStream</a>
 and <a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html";>DataSet</a>
 APIs. A <code>Table</code> can be generated from and converted to a 
<code>DataSet</code> or <code>DataStream</code>. Hence, it is easily possible 
to scan an external table source such as a database or <a 
href="https://parquet.apache.org";>Parquet</a> file, do some preprocessing with 
a Table API query, convert the result into a <code>DataSet</code> and run a <a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/index.html";>Gelly</a>
 graph algorithm on it. The queries defined in the example above can also be 
used to process batch data by changing the execution environment.</p>
+
+<p>Internally, both APIs are translated into the same logical representation, 
optimized by Apache Calcite, and compiled into DataStream or DataSet programs. 
In fact, the optimization and translation process does not know whether a query 
was defined using the Table API or SQL. If you are curious about the details of 
the optimization process, have a look at <a 
href="http://flink.apache.org/news/2016/05/24/stream-sql.html";>a blog post</a> 
that we published last year. Since the Table API and SQL are equivalent in 
terms of semantics and only differ in syntax, we always refer to both APIs when 
we talk about SQL in this post.</p>
+
+<p>In its current state (version 1.2.0), Flink’s relational APIs support a 
limited set of relational operators on data streams, including projections, 
filters, and windowed aggregates. All supported operators have in common that 
they never update result records which have been emitted. This is clearly not 
an issue for record-at-a-time operators such as projection and filter. However, 
it affects operators that collect and process multiple records as for instance 
windowed aggregates. Since emitted results cannot be updated, input records, 
which arrive after a result has been emitted, have to be discarded in Flink 
1.2.0.</p>
+
+<p>The limitations of the current version are acceptable for applications that 
emit data to storage systems such as Kafka topics, message queues, or files 
which only support append operations and no updates or deletes. Common use 
cases that follow this pattern are for example continuous ETL and stream 
archiving applications that persist streams to an archive or prepare data for 
further online (streaming) analysis or later offline analysis. Since it is not 
possible to update previously emitted results, these kinds of applications have 
to make sure that the emitted results are correct and will not need to be 
corrected in the future. The following figure illustrates such applications.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/query-append-out.png" 
style="width:60%;margin:10px" />
+</center>
+
+<p>While queries that only support appends are useful for some kinds of 
applications and certain types of storage systems, there are many streaming 
analytics use cases that need to update results. This includes streaming 
applications that cannot discard late arriving records, need early results for 
(long-running) windowed aggregates, or require non-windowed aggregates. In each 
of these cases, previously emitted result records need to be updated. 
Result-updating queries often materialize their result to an external database 
or key-value store in order to make it accessible and queryable for external 
applications. Applications that implement this pattern are dashboards, 
reporting applications, or <a 
href="http://2016.flink-forward.org/kb_sessions/joining-infinity-windowless-stream-processing-with-flink/";>other
 applications</a>, which require timely access to continuously updated results. 
The following figure illustrates these kind of applications.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/query-update-out.png" 
style="width:60%;margin:10px" />
+</center>
+
+<h2 id="continuous-queries-on-dynamic-tables">Continuous Queries on Dynamic 
Tables</h2>
+
+<p>Support for queries that update previously emitted results is the next big 
step for Flink’s relational APIs. This feature is so important because it 
vastly increases the scope of the APIs and the range of supported use cases. 
Moreover, many of the newly supported use cases can be challenging to implement 
using the DataStream API.</p>
+
+<p>So when adding support for result-updating queries, we must of course 
preserve the unified semantics for stream and batch inputs. We achieve this by 
the concept of <em>Dynamic Tables</em>. A dynamic table is a table that is 
continuously updated and can be queried like a regular, static table. However, 
in contrast to a query on a batch table which terminates and returns a static 
table as result, a query on a dynamic table runs continuously and produces a 
table that is continuously updated depending on the modification on the input 
table. Hence, the resulting table is a dynamic table as well. This concept is 
very similar to materialized view maintenance as we discussed before.</p>
+
+<p>Assuming we can run queries on dynamic tables which produce new dynamic 
tables, the next question is, How do streams and dynamic tables relate to each 
other? The answer is that streams can be converted into dynamic tables and 
dynamic tables can be converted into streams. The following figure shows the 
conceptual model of processing a relational query on a stream.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/stream-query-stream.png" 
style="width:70%;margin:10px" />
+</center>
+
+<p>First, the stream is converted into a dynamic table. The dynamic table is 
queried with a continuous query, which produces a new dynamic table. Finally, 
the resulting table is converted back into a stream. It is important to note 
that this is only the logical model and does not imply how the query is 
actually executed. In fact, a continuous query is internally translated into a 
conventional DataStream program.</p>
+
+<p>In the following, we describe the different steps of this model:</p>
+
+<ol>
+  <li>Defining a dynamic table on a stream,</li>
+  <li>Querying a dynamic table, and</li>
+  <li>Emitting a dynamic table.</li>
+</ol>
+
+<h2 id="defining-a-dynamic-table-on-a-stream">Defining a Dynamic Table on a 
Stream</h2>
+
+<p>The first step of evaluating a SQL query on a dynamic table is to define a 
dynamic table on a stream. This means we have to specify how the records of a 
stream modify the dynamic table. The stream must carry records with a schema 
that is mapped to the relational schema of the table. There are two modes to 
define a dynamic table on a stream: <em>Append Mode</em> and <em>Update 
Mode</em>.</p>
+
+<p>In append mode each stream record is an insert modification to the dynamic 
table. Hence, all records of a stream are appended to the dynamic table such 
that it is ever-growing and infinite in size. The following figure illustrates 
the append mode.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/append-mode.png" 
style="width:70%;margin:10px" />
+</center>
+
+<p>In update mode a stream record can represent an insert, update, or delete 
modification on the dynamic table (append mode is in fact a special case of 
update mode). When defining a dynamic table on a stream via update mode, we can 
specify a unique key attribute on the table. In that case, update and delete 
operations are performed with respect to the key attribute. The update mode is 
visualized in the following figure.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/replace-mode.png" 
style="width:70%;margin:10px" />
+</center>
+
+<h2 id="querying-a-dynamic-table">Querying a Dynamic Table</h2>
+
+<p>Once we have defined a dynamic table, we can run a query on it. Since 
dynamic tables change over time, we have to define what it means to query a 
dynamic table. Let’s imagine we take a snapshot of a dynamic table at a 
specific point in time. This snapshot can be treated as a regular static batch 
table. We denote a snapshot of a dynamic table <em>A</em> at a point <em>t</em> 
as <em>A[t]</em>. The snapshot can be queried with any SQL query. The query 
produces a regular static table as result. We denote the result of a query 
<em>q</em> on a dynamic table <em>A</em> at time <em>t</em> as 
<em>q(A[t])</em>. If we repeatedly compute the result of a query on snapshots 
of a dynamic table for progressing points in time, we obtain many static result 
tables which are changing over time and effectively constitute a dynamic table. 
We define the semantics of a query on a dynamic table as follows.</p>
+
+<p>A query <em>q</em> on a dynamic table <em>A</em> produces a dynamic table 
<em>R</em>, which is at each point in time <em>t</em> equivalent to the result 
of applying <em>q</em> on <em>A[t]</em>, i.e., <em>R[t] = q(A[t])</em>. This 
definition implies that running the same query on <em>q</em> on a batch table 
and on a streaming table produces the same result. In the following, we show 
two examples to illustrate the semantics of queries on dynamic tables.</p>
+
+<p>In the figure below, we see a dynamic input table <em>A</em> on the left 
side, which is defined in append mode. At time <em>t = 8</em>, <em>A</em> 
consists of six rows (colored in blue). At time <em>t = 9</em> and <em>t = 
12</em>, one row is appended to <em>A</em> (visualized in green and orange, 
respectively). We run a simple query on table <em>A</em> which is shown in the 
center of the figure. The query groups by attribute <em>k</em> and counts the 
records per group. On the right hand side we see the result of query <em>q</em> 
at time <em>t = 8</em> (blue), <em>t = 9</em> (green), and <em>t = 12</em> 
(orange). At each point in time t, the result table is equivalent to a batch 
query on the dynamic table <em>A</em> at time <em>t</em>.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/query-groupBy-cnt.png" 
style="width:70%;margin:10px" />
+</center>
+
+<p>The query in this example is a simple grouped (but not windowed) 
aggregation query. Hence, the size of the result table depends on the number of 
distinct grouping keys of the input table. Moreover, it is worth noticing that 
the query continuously updates result rows that it had previously emitted 
instead of merely adding new rows.</p>
+
+<p>The second example shows a similar query which differs in one important 
aspect. In addition to grouping on the key attribute <em>k</em>, the query also 
groups records into tumbling windows of five seconds, which means that it 
computes a count for each value of <em>k</em> every five seconds. Again, we use 
Calcite’s <a 
href="https://calcite.apache.org/docs/reference.html#grouped-window-functions";>group
 window functions</a> to specify this query. On the left side of the figure we 
see the input table <em>A</em> and how it changes over time in append mode. On 
the right we see the result table and how it evolves over time.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/query-groupBy-window-cnt.png" 
style="width:80%;margin:10px" />
+</center>
+
+<p>In contrast to the result of the first example, the resulting table grows 
relative to the time, i.e., every five seconds new result rows are computed 
(given that the input table received more records in the last five seconds). 
While the non-windowed query (mostly) updates rows of the result table, the 
windowed aggregation query only appends new rows to the result table.</p>
+
+<p>Although this blog post focuses on the semantics of SQL queries on dynamic 
tables and not on how to efficiently process such a query, we’d like to point 
out that it is not possible to compute the complete result of a query from 
scratch whenever an input table is updated. Instead, the query is compiled into 
a streaming program which continuously updates its result based on the changes 
on its input. This implies that not all valid SQL queries are supported but 
only those that can be continuously, incrementally, and efficiently computed. 
We plan discuss details about the evaluation of SQL queries on dynamic tables 
in a follow up blog post.</p>
+
+<h2 id="emitting-a-dynamic-table">Emitting a Dynamic Table</h2>
+
+<p>Querying a dynamic table yields another dynamic table, which represents the 
query’s results. Depending on the query and its input tables, the result 
table is continuously modified by insert, update, and delete changes just like 
a regular database table. It might be a table with a single row, which is 
constantly updated, an insert-only table without update modifications, or 
anything in between.</p>
+
+<p>Traditional database systems use logs to rebuild tables in case of failures 
and for replication. There are different logging techniques, such as UNDO, 
REDO, and UNDO/REDO logging. In a nutshell, UNDO logs record the previous value 
of a modified element to revert incomplete transactions, REDO logs record the 
new value of a modified element to redo lost changes of completed transactions, 
and UNDO/REDO logs record the old and the new value of a changed element to 
undo incomplete transactions and redo lost changes of completed transactions. 
Based on the principles of these logging techniques, a dynamic table can be 
converted into two types of changelog streams, a <em>REDO Stream</em> and a 
<em>REDO+UNDO Stream</em>.</p>
+
+<p>A dynamic table is converted into a redo+undo stream by converting the 
modifications on the table into stream messages. An insert modification is 
emitted as an insert message with the new row, a delete modification is emitted 
as a delete message with the old row, and an update modification is emitted as 
a delete message with the old row and an insert message with the new row. This 
behavior is illustrated in the following figure.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/undo-redo-mode.png" 
style="width:70%;margin:10px" />
+</center>
+
+<p>The left shows a dynamic table which is maintained in append mode and 
serves as input to the query in the center. The result of the query converted 
into a redo+undo stream which is shown at the bottom. The first record <em>(1, 
A)</em> of the input table results in a new record in the result table and 
hence in an insert message <em>+(A, 1)</em> to the stream. The second input 
record with <em>k = ‘A’</em> <em>(4, A)</em> produces an update of the 
<em>(A, 1)</em> record in the result table and hence yields a delete message 
<em>-(A, 1)</em> and an insert message for <em>+(A, 2)</em>. All downstream 
operators or data sinks need to be able to correctly handle both types of 
messages.</p>
+
+<p>A dynamic table can be converted into a redo stream in two cases: either it 
is an append-only table (i.e., it only has insert modifications) or it has a 
unique key attribute. Each insert modification on the dynamic table results in 
an insert message with the new row to the redo stream. Due to the restriction 
of redo streams, only tables with unique keys can have update and delete 
modifications. If a key is removed from the keyed dynamic table, either because 
a row is deleted or because the key attribute of a row was modified, a delete 
message with the removed key is emitted to the redo stream. An update 
modification yields an update message with the updating, i.e., new row. Since 
delete and update modifications are defined with respect to the unique key, the 
downstream operators need to be able to access previous values by key. The 
figure below shows how the result table of the same query as above is converted 
into a redo stream.</p>
+
+<center>
+<img src="/img/blog/dynamic-tables/redo-mode.png" 
style="width:70%;margin:10px" />
+</center>
+
+<p>The row <em>(1, A)</em> which yields an insert into the dynamic table 
results in the <em>+(A, 1)</em> insert message. The row <em>(4, A)</em> which 
produces an update yields the <em>*(A, 2)</em> update message.</p>
+
+<p>Common use cases for redo streams are to write the result of a query to an 
append-only storage system, like rolling files or a Kafka topic, or to a data 
store with keyed access, such as Cassandra, a relational DBMS, or a compacted 
Kafka topic. It is also possible to materialize a dynamic table as keyed state 
inside of the streaming application that evaluates the continuous query and 
make it queryable from external systems. With this design Flink itself 
maintains the result of a continuous SQL query on a stream and serves key 
lookups on the result table, for instance from a dashboard application.</p>
+
+<h2 id="what-will-change-when-switching-to-dynamic-tables">What will Change 
When Switching to Dynamic Tables?</h2>
+
+<p>In version 1.2, all streaming operators of Flink’s relational APIs, like 
filter, project, and group window aggregates, only emit new rows and are not 
capable of updating previously emitted results. In contrast, dynamic table are 
able to handle update and delete modifications. Now you might ask yourself, How 
does the processing model of the current version relate to the new dynamic 
table model? Will the semantics of the APIs completely change and do we need to 
reimplement the APIs from scratch to achieve the desired semantics?</p>
+
+<p>The answer to all these questions is simple. The current processing model 
is a subset of the dynamic table model. Using the terminology we introduced in 
this post, the current model converts a stream into a dynamic table in append 
mode, i.e., an infinitely growing table. Since all operators only accept insert 
changes and produce insert changes on their result table (i.e., emit new rows), 
all supported queries result in dynamic append tables, which are converted back 
into DataStreams using the redo model for append-only tables. Consequently, the 
semantics of the current model are completely covered and preserved by the new 
dynamic table model.</p>
+
+<h2 id="conclusion-and-outlook">Conclusion and Outlook</h2>
+
+<p>Flink’s relational APIs are great to implement stream analytics 
applications in no time and used in several production settings. In this blog 
post we discussed the future of the Table API and SQL. This effort will make 
Flink and stream processing accessible to more people. Moreover, the unified 
semantics for querying historic and real-time data as well as the concept of 
querying and maintaining dynamic tables will enable and significantly ease the 
implementation of many exciting use cases and applications. As this post was 
focusing on the semantics of relational queries on streams and dynamic tables, 
we did not discuss the details of how a query will be executed, which includes 
the internal implementation of retractions, handling of late events, support 
for early results, and bounding space requirements. We plan to publish a follow 
up blog post on this topic at a later point in time.</p>
+
+<p>In recent months, many members of the Flink community have been discussing 
and contributing to the relational APIs. We made great progress so far. While 
most work has focused on processing streams in append mode, the next steps on 
the agenda are to work on dynamic tables to support queries that update their 
results. If you are excited about the idea of processing streams with SQL and 
would like to contribute to this effort, please give feedback, join the 
discussions on the mailing list, or grab a JIRA issue to work on.</p>
+
+      </article>
+    </div>
+
+    <div class="row">
+      <div id="disqus_thread"></div>
+      <script type="text/javascript">
+        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE 
* * */
+        var disqus_shortname = 'stratosphere-eu'; // required: replace example 
with your forum shortname
+
+        /* * * DON'T EDIT BELOW THIS LINE * * */
+        (function() {
+            var dsq = document.createElement('script'); dsq.type = 
'text/javascript'; dsq.async = true;
+            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+             (document.getElementsByTagName('head')[0] || 
document.getElementsByTagName('body')[0]).appendChild(dsq);
+        })();
+      </script>
+    </div>
+  </div>
+</div>
+      </div>
+    </div>
+
+    <hr />
+
+    <div class="row">
+      <div class="footer text-center col-sm-12">
+        <p>Copyright © 2014-2016 <a href="http://apache.org";>The Apache 
Software Foundation</a>. All Rights Reserved.</p>
+        <p>Apache Flink, Apache, and the Apache feather logo are either 
registered trademarks or trademarks of The Apache Software Foundation.</p>
+        <p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a 
href="/blog/feed.xml">RSS feed</a></p>
+      </div>
+    </div>
+    </div><!-- /.container -->
+
+    <!-- Include all compiled plugins (below), or include individual files as 
needed -->
+    <script 
src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js";></script>
+    <script src="/js/codetabs.js"></script>
+    <script src="/js/stickysidebar.js"></script>
+
+
+    <!-- Google Analytics -->
+    <script>
+      
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new 
Date();a=s.createElement(o),
+      
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+      
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+      ga('create', 'UA-52545728-1', 'auto');
+      ga('send', 'pageview');
+    </script>
+  </body>
+</html>

Reply via email to