This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 830742a  [FLINK-13355][docs] Add documentation for Temporal Table Join 
in blink planner
830742a is described below

commit 830742a4b1a88c6ea569689c8dd2bd1f0f97a6d9
Author: Jark Wu <imj...@gmail.com>
AuthorDate: Wed Aug 28 00:11:09 2019 +0800

    [FLINK-13355][docs] Add documentation for Temporal Table Join in blink 
planner
    
    This closes #9545
---
 docs/dev/table/sourceSinks.md                  |  42 ++++++++
 docs/dev/table/sourceSinks.zh.md               |  42 ++++++++
 docs/dev/table/sql.md                          |  27 ++++-
 docs/dev/table/sql.zh.md                       |  27 ++++-
 docs/dev/table/streaming/joins.md              | 133 ++++++++++++++++++++++++-
 docs/dev/table/streaming/joins.zh.md           | 133 ++++++++++++++++++++++++-
 docs/dev/table/streaming/temporal_tables.md    | 113 ++++++++++++++++++++-
 docs/dev/table/streaming/temporal_tables.zh.md | 113 ++++++++++++++++++++-
 docs/dev/table/tableApi.md                     |  12 +--
 docs/dev/table/tableApi.zh.md                  |  12 +--
 10 files changed, 626 insertions(+), 28 deletions(-)

diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index b27d42d..12b4ca7 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -324,6 +324,48 @@ FilterableTableSource[T] {
 
 {% top %}
 
+### Defining a TableSource for Lookups
+
+<span class="label label-danger">Attention</span> This is an experimental 
feature. The interface may be changed in future versions. It's only supported 
in Blink planner.
+
+The `LookupableTableSource` interface adds support for the table to be 
accessed via key column(s) in a lookup fashion. This is very useful when used 
to join with a dimension table to enrich some information. If you want to use 
the `TableSource` in lookup mode, you should use the source in [temporal table 
join syntax](streaming/joins.html).
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+LookupableTableSource<T> implements TableSource<T> {
+
+  public TableFunction<T> getLookupFunction(String[] lookupkeys);
+
+  public AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupkeys);
+
+  public boolean isAsyncEnabled();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LookupableTableSource[T] extends TableSource[T] {
+
+  def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]
+
+  def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]
+
+  def isAsyncEnabled: Boolean
+}
+{% endhighlight %}
+</div>
+</div>
+
+* `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to 
lookup the matched row(s) via lookup keys. The lookupkeys are the field names 
of `LookupableTableSource` in the join equal conditions. The eval method 
parameters of the returned `TableFunction`'s should be in the order which 
`lookupkeys` defined. It is recommended to define the parameters in varargs 
(e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of 
the `TableFunction` must be identical [...]
+* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to 
`getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) 
asynchronously. The underlying of `AsyncLookupFunction` will be called via 
[Async I/O](/dev/stream/operators/asyncio.html). The first argument of the eval 
method of the returned `AsyncTableFunction` should be defined as 
`java.util.concurrent.CompletableFuture` to collect results asynchronously 
(e.g. `eval(CompletableFuture<Collection<String>> result,  [...]
+* `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires 
`getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns 
true.
+
+{% top %}
+
 Define a TableSink
 ------------------
 
diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md
index b34967f..e37e7be 100644
--- a/docs/dev/table/sourceSinks.zh.md
+++ b/docs/dev/table/sourceSinks.zh.md
@@ -324,6 +324,48 @@ FilterableTableSource[T] {
 
 {% top %}
 
+### Defining a TableSource for Lookups
+
+<span class="label label-danger">Attention</span> This is an experimental 
feature. The interface may be changed in future versions. It's only supported 
in Blink planner.
+
+The `LookupableTableSource` interface adds support for the table to be 
accessed via key column(s) in a lookup fashion. This is very useful when used 
to join with a dimension table to enrich some information. If you want to use 
the `TableSource` in lookup mode, you should use the source in [temporal table 
join syntax](streaming/joins.html).
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+LookupableTableSource<T> implements TableSource<T> {
+
+  public TableFunction<T> getLookupFunction(String[] lookupkeys);
+
+  public AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupkeys);
+
+  public boolean isAsyncEnabled();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LookupableTableSource[T] extends TableSource[T] {
+
+  def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]
+
+  def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]
+
+  def isAsyncEnabled: Boolean
+}
+{% endhighlight %}
+</div>
+</div>
+
+* `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to 
lookup the matched row(s) via lookup keys. The lookupkeys are the field names 
of `LookupableTableSource` in the join equal conditions. The eval method 
parameters of the returned `TableFunction`'s should be in the order which 
`lookupkeys` defined. It is recommended to define the parameters in varargs 
(e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of 
the `TableFunction` must be identical [...]
+* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to 
`getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) 
asynchronously. The underlying of `AsyncLookupFunction` will be called via 
[Async I/O](/dev/stream/operators/asyncio.html). The first argument of the eval 
method of the returned `AsyncTableFunction` should be defined as 
`java.util.concurrent.CompletableFuture` to collect results asynchronously 
(e.g. `eval(CompletableFuture<Collection<String>> result,  [...]
+* `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires 
`getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns 
true.
+
+{% top %}
+
 Define a TableSink
 ------------------
 
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 79f0b41..0cfbdda 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -598,7 +598,7 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
     </tr>
     <tr>
        <td>
-        <strong>Join with Table Function</strong><br>
+        <strong>Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
@@ -624,7 +624,7 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS 
tag ON TRUE
     </tr>
     <tr>
       <td>
-        <strong>Join with Temporal Table</strong><br>
+        <strong>Join with Temporal Table Function</strong><br>
         <span class="label label-primary">Streaming</span>
       </td>
       <td>
@@ -647,6 +647,29 @@ WHERE
         <p>For more information please check the more detailed <a 
href="streaming/temporal_tables.html">temporal tables concept 
description</a>.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are 
tables that track changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table">Temporal 
Table</a> provides access to the versions of a temporal table at a specific 
point in time.</p>
+
+        <p>Only inner and left joins with processing-time temporal tables are 
supported.</p>
+        <p>The following example assumes that <strong>LatestRates</strong> is 
a <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> 
which is materialized with the latest rate.</p>
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+        <p>For more information please check the more detailed <a 
href="streaming/temporal_tables.html">Temporal Tables</a> concept 
description.</p>
+        <p>Only supported in Blink planner.</p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index 86e9d42..65bc520 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -623,9 +623,9 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS 
tag ON TRUE
         <p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported 
as predicate for a left outer join against a lateral table.</p>
       </td>
     </tr>
-    <tr>
+   <tr>
       <td>
-        <strong>Join with Temporal Table</strong><br>
+        <strong>Join with Temporal Table Function</strong><br>
         <span class="label label-primary">Streaming</span>
       </td>
       <td>
@@ -648,6 +648,29 @@ WHERE
         <p>For more information please check the more detailed <a 
href="streaming/temporal_tables.html">temporal tables concept 
description</a>.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are 
tables that track changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table">Temporal 
Table</a> provides access to the versions of a temporal table at a specific 
point in time.</p>
+
+        <p>Only inner and left joins with processing-time temporal tables are 
supported.</p>
+        <p>The following example assumes that <strong>LatestRates</strong> is 
a <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> 
which is materialized with the latest rate.</p>
+    {% highlight sql %}
+    SELECT
+    o.amout, o.currency, r.rate, o.amount * r.rate
+    FROM
+    Orders AS o
+    JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+    ON r.currency = o.currency
+    {% endhighlight %}
+        <p>For more information please check the more detailed <a 
href="streaming/temporal_tables.html">Temporal Tables</a> concept 
description.</p>
+        <p>Only supported in Blink planner.</p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
diff --git a/docs/dev/table/streaming/joins.md 
b/docs/dev/table/streaming/joins.md
index 0e5e0c3..476e121 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -65,10 +65,10 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monontic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
-Join with a Temporal Table
+Join with a Temporal Table Function
 --------------------------
 
-A join with a temporal table joins an append-only table (left input/probe 
side) with a temporal table (right input/build side),
+A join with a temporal table function joins an append-only table (left 
input/probe side) with a temporal table (right input/build side),
 i.e., a table that changes over time and tracks its changes. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html).
 
 The following example shows an append-only table `Orders` that should be 
joined with the continuously changing currency rates table `RatesHistory`.
@@ -222,4 +222,133 @@ By definition of event time, [watermarks]({{ site.baseurl 
}}/dev/event_time.html
 forward in time and discard versions of the build table that are no longer 
necessary because no incoming row with
 lower or equal timestamp is expected.
 
+Join with a Temporal Table
+--------------------------
+
+A join with a temporal table joins an arbitrary table (left input/probe side) 
with a temporal table (right input/build side),
+i.e., an external dimension table that changes over time. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html#temporal-table).
+
+<span class="label label-danger">Attention</span> Users can not use arbitrary 
tables as a temporal table, but need to use a table backed by a 
`LookupableTableSource`. A `LookupableTableSource` can only be used for 
temporal join as a temporal table. See the page for more details about [how to 
define 
LookupableTableSource](../sourceSinks.html#defining-a-tablesource-with-lookupable).
+
+The following example shows an `Orders` stream that should be joined with the 
continuously changing currency rates table `LatestRates`.
+
+`LatestRates` is a dimension table that is materialized with the latest rate. 
At time `10:15`, `10:30`, `10:52`, the content of `LatestRates` looks as 
follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro 
rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest 
rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation. Note that the result is 
not deterministic for processing-time.
+
+In contrast to [regular joins](#regular-joins), the previous results of the 
temporal table join will not be affected despite the changes on the build side. 
Also, the temporal table join operator is very lightweight and does not keep 
any state.
+
+Compared to [time-windowed joins](#time-windowed-joins), temporal table joins 
do not define a time window within which the records will be joined.
+Records from the probe side are always joined with the build side's latest 
version at processing time. Thus, records on the build side might be 
arbitrarily old.
+
+Both [temporal table function join](#join-with-a-temporal-table-function) and 
temporal table join come from the same motivation but have different SQL syntax 
and runtime implementations:
+* The SQL syntax of the temporal table function join is a join UDTF, while the 
temporal table join uses the standard temporal table syntax introduced in 
SQL:2011.
+* The implementation of temporal table function joins actually joins two 
streams and keeps them in state, while temporal table joins just receive the 
only input stream and look up the external database according to the key in the 
record.
+* The temporal table function join is usually used to join a changelog stream, 
while the temporal table join is usually used to join an external table (i.e. 
dimension table).
+
+Such behaviour makes a temporal table join a good candidate to express stream 
enrichment in relational terms.
+
+In the future, the temporal table join will support the features of temporal 
table function joins, i.e. support to temporal join a changelog stream.
+
+### Usage
+
+The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+Currently, only support INNER JOIN and LEFT JOIN. The `FOR SYSTEM_TIME AS OF 
table1.proctime` should be followed after temporal table. `proctime` is a 
[processing time attribute](time_attributes.html#processing-time) of `table1`.
+This means that it takes a snapshot of the temporal table at processing time 
when joining every record from left table.
+
+For example, after [defining temporal 
table](temporal_tables.html#defining-temporal-table), we can use it as 
following.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT
+  SUM(o_amount * r_rate) AS amount
+FROM
+  Orders
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
+  ON r_currency = o_currency
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> It is only supported in 
Blink planner.
+
+<span class="label label-danger">Attention</span> It is only supported in SQL, 
and not supported in Table API yet.
+
+<span class="label label-danger">Attention</span> Flink does not support event 
time temporal table joins currently.
+
 {% top %}
diff --git a/docs/dev/table/streaming/joins.zh.md 
b/docs/dev/table/streaming/joins.zh.md
index 0e5e0c3..476e121 100644
--- a/docs/dev/table/streaming/joins.zh.md
+++ b/docs/dev/table/streaming/joins.zh.md
@@ -65,10 +65,10 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monontic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
-Join with a Temporal Table
+Join with a Temporal Table Function
 --------------------------
 
-A join with a temporal table joins an append-only table (left input/probe 
side) with a temporal table (right input/build side),
+A join with a temporal table function joins an append-only table (left 
input/probe side) with a temporal table (right input/build side),
 i.e., a table that changes over time and tracks its changes. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html).
 
 The following example shows an append-only table `Orders` that should be 
joined with the continuously changing currency rates table `RatesHistory`.
@@ -222,4 +222,133 @@ By definition of event time, [watermarks]({{ site.baseurl 
}}/dev/event_time.html
 forward in time and discard versions of the build table that are no longer 
necessary because no incoming row with
 lower or equal timestamp is expected.
 
+Join with a Temporal Table
+--------------------------
+
+A join with a temporal table joins an arbitrary table (left input/probe side) 
with a temporal table (right input/build side),
+i.e., an external dimension table that changes over time. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html#temporal-table).
+
+<span class="label label-danger">Attention</span> Users can not use arbitrary 
tables as a temporal table, but need to use a table backed by a 
`LookupableTableSource`. A `LookupableTableSource` can only be used for 
temporal join as a temporal table. See the page for more details about [how to 
define 
LookupableTableSource](../sourceSinks.html#defining-a-tablesource-with-lookupable).
+
+The following example shows an `Orders` stream that should be joined with the 
continuously changing currency rates table `LatestRates`.
+
+`LatestRates` is a dimension table that is materialized with the latest rate. 
At time `10:15`, `10:30`, `10:52`, the content of `LatestRates` looks as 
follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro 
rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest 
rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation. Note that the result is 
not deterministic for processing-time.
+
+In contrast to [regular joins](#regular-joins), the previous results of the 
temporal table join will not be affected despite the changes on the build side. 
Also, the temporal table join operator is very lightweight and does not keep 
any state.
+
+Compared to [time-windowed joins](#time-windowed-joins), temporal table joins 
do not define a time window within which the records will be joined.
+Records from the probe side are always joined with the build side's latest 
version at processing time. Thus, records on the build side might be 
arbitrarily old.
+
+Both [temporal table function join](#join-with-a-temporal-table-function) and 
temporal table join come from the same motivation but have different SQL syntax 
and runtime implementations:
+* The SQL syntax of the temporal table function join is a join UDTF, while the 
temporal table join uses the standard temporal table syntax introduced in 
SQL:2011.
+* The implementation of temporal table function joins actually joins two 
streams and keeps them in state, while temporal table joins just receive the 
only input stream and look up the external database according to the key in the 
record.
+* The temporal table function join is usually used to join a changelog stream, 
while the temporal table join is usually used to join an external table (i.e. 
dimension table).
+
+Such behaviour makes a temporal table join a good candidate to express stream 
enrichment in relational terms.
+
+In the future, the temporal table join will support the features of temporal 
table function joins, i.e. support to temporal join a changelog stream.
+
+### Usage
+
+The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+Currently, only support INNER JOIN and LEFT JOIN. The `FOR SYSTEM_TIME AS OF 
table1.proctime` should be followed after temporal table. `proctime` is a 
[processing time attribute](time_attributes.html#processing-time) of `table1`.
+This means that it takes a snapshot of the temporal table at processing time 
when joining every record from left table.
+
+For example, after [defining temporal 
table](temporal_tables.html#defining-temporal-table), we can use it as 
following.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT
+  SUM(o_amount * r_rate) AS amount
+FROM
+  Orders
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
+  ON r_currency = o_currency
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> It is only supported in 
Blink planner.
+
+<span class="label label-danger">Attention</span> It is only supported in SQL, 
and not supported in Table API yet.
+
+<span class="label label-danger">Attention</span> Flink does not support event 
time temporal table joins currently.
+
 {% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.md 
b/docs/dev/table/streaming/temporal_tables.md
index 5d69197..baecbd0 100644
--- a/docs/dev/table/streaming/temporal_tables.md
+++ b/docs/dev/table/streaming/temporal_tables.md
@@ -22,9 +22,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Temporal Tables represent a concept of a (parameterized) view on a changing 
history table that returns the content of a table at a specific point in time.
+Temporal Tables represent a concept of a (parameterized) view on a changing 
table that returns the content of a table at a specific point in time.
 
-Flink can keep track of the changes applied to an underlying append-only table 
and allows for accessing the table's content at a certain point in time within 
a query.
+The changing table can either be a changing history table which tracks the 
changes (e.g. database changelogs) or a changing dimension table which 
materializes the changes (e.g. database tables).
+
+For the changing history table, Flink can keep track of the changes and allows 
for accessing the content of the table at a certain point in time within a 
query. In Flink, this kind of table is represented by a *Temporal Table 
Function*.
+
+For the changing dimension table, Flink allows for accessing the content of 
the table at processing time within a query. In Flink, this kind of table is 
represented by a *Temporal Table*.
 
 * This will be replaced by the TOC
 {:toc}
@@ -32,6 +36,8 @@ Flink can keep track of the changes applied to an underlying 
append-only table a
 Motivation
 ----------
 
+### Correlate with a changing history table
+
 Let's assume that we have the following table `RatesHistory`.
 
 {% highlight sql %}
@@ -78,9 +84,38 @@ The concept of *Temporal Tables* aims to simplify such 
queries, speed up their e
 
 In the above example `currency` would be a primary key for `RatesHistory` 
table and `rowtime` would be the timestamp attribute.
 
-In Flink, a temporal table is represented by a *Temporal Table Function*.
+In Flink, this is represented by a [*Temporal Table 
Function*](#temporal-table-function).
+
+### Correlate with a changing dimension table
+
+On the other hand, some use cases require to join a changing dimension table 
which is an external database table.
+
+Let's assume that `LatestRates` is a table (e.g. stored in) which is 
materialized with the latest rate. The `LatestRates` is the materialized 
history `RatesHistory`. Then the content of `LatestRates` table at time `10:58` 
will be:
+
+{% highlight text %}
+10:58> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        116
+{% endhighlight %}
+
+The content of `LatestRates` table at time `12:00` will be:
+
+{% highlight text %}
+12:00> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        119
+Pounds      108
+{% endhighlight %}
+
+In Flink, this is represented by a [*Temporal Table*](#temporal-table).
 
-Temporal Table Functions
+Temporal Table Function
 ------------------------
 
 In order to access the data in a temporal table, one must pass a [time 
attribute](time_attributes.html) that determines the version of the table that 
will be returned.
@@ -187,4 +222,74 @@ which allows us to use the function `rates` in the [Table 
API](../tableApi.html#
 Line `(2)` registers this function under the name `Rates` in our table 
environment,
 which allows us to use the `Rates` function in [SQL](../sql.html#joins).
 
+## Temporal Table
+
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+
+In order to access data in temporal table, currently one must define a 
`TableSource` with `LookupableTableSource`. Flink uses the SQL syntax of `FOR 
SYSTEM_TIME AS OF` to query temporal table, which is proposed in SQL:2011.
+
+Assuming that we defined a temporal table called `LatestRates`, we can query 
such a table in the following way:
+
+{% highlight sql %}
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116
+Yen           1
+{% endhighlight %}
+
+**Note**: Currently, Flink doesn't support directly querying the temporal 
table with a constant time. At the moment, temporal table can only be used in 
joins. The example above is used to provide an intuition about what the 
temporal table `LatestRates` returns.
+
+See also the page about [joins for continuous queries](joins.html) for more 
information about how to join with a temporal table.
+
+### Defining Temporal Table
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Get the stream and table environments.
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+// Create an HBaseTableSource as a temporal table which implements 
LookableTableSource
+// In the real setup, you should replace this with your own table.
+HBaseTableSource rates = new HBaseTableSource(conf, "Rates");
+rates.setRowKey("currency", String.class);   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class);
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Get the stream and table environments.
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+// Create an HBaseTableSource as a temporal table which implements 
LookableTableSource
+// In the real setup, you should replace this with your own table.
+val rates = new HBaseTableSource(conf, "Rates")
+rates.setRowKey("currency", String.class)   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class)
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates)
+{% endhighlight %}
+</div>
+</div>
+
+See also the page about [how to define 
LookupableTableSource](../sourceSinks.html#defining-a-tablesource-for-lookups).
+
 {% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.zh.md 
b/docs/dev/table/streaming/temporal_tables.zh.md
index 5d69197..baecbd0 100644
--- a/docs/dev/table/streaming/temporal_tables.zh.md
+++ b/docs/dev/table/streaming/temporal_tables.zh.md
@@ -22,9 +22,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Temporal Tables represent a concept of a (parameterized) view on a changing 
history table that returns the content of a table at a specific point in time.
+Temporal Tables represent a concept of a (parameterized) view on a changing 
table that returns the content of a table at a specific point in time.
 
-Flink can keep track of the changes applied to an underlying append-only table 
and allows for accessing the table's content at a certain point in time within 
a query.
+The changing table can either be a changing history table which tracks the 
changes (e.g. database changelogs) or a changing dimension table which 
materializes the changes (e.g. database tables).
+
+For the changing history table, Flink can keep track of the changes and allows 
for accessing the content of the table at a certain point in time within a 
query. In Flink, this kind of table is represented by a *Temporal Table 
Function*.
+
+For the changing dimension table, Flink allows for accessing the content of 
the table at processing time within a query. In Flink, this kind of table is 
represented by a *Temporal Table*.
 
 * This will be replaced by the TOC
 {:toc}
@@ -32,6 +36,8 @@ Flink can keep track of the changes applied to an underlying 
append-only table a
 Motivation
 ----------
 
+### Correlate with a changing history table
+
 Let's assume that we have the following table `RatesHistory`.
 
 {% highlight sql %}
@@ -78,9 +84,38 @@ The concept of *Temporal Tables* aims to simplify such 
queries, speed up their e
 
 In the above example `currency` would be a primary key for `RatesHistory` 
table and `rowtime` would be the timestamp attribute.
 
-In Flink, a temporal table is represented by a *Temporal Table Function*.
+In Flink, this is represented by a [*Temporal Table 
Function*](#temporal-table-function).
+
+### Correlate with a changing dimension table
+
+On the other hand, some use cases require to join a changing dimension table 
which is an external database table.
+
+Let's assume that `LatestRates` is a table (e.g. stored in) which is 
materialized with the latest rate. The `LatestRates` is the materialized 
history `RatesHistory`. Then the content of `LatestRates` table at time `10:58` 
will be:
+
+{% highlight text %}
+10:58> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        116
+{% endhighlight %}
+
+The content of `LatestRates` table at time `12:00` will be:
+
+{% highlight text %}
+12:00> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        119
+Pounds      108
+{% endhighlight %}
+
+In Flink, this is represented by a [*Temporal Table*](#temporal-table).
 
-Temporal Table Functions
+Temporal Table Function
 ------------------------
 
 In order to access the data in a temporal table, one must pass a [time 
attribute](time_attributes.html) that determines the version of the table that 
will be returned.
@@ -187,4 +222,74 @@ which allows us to use the function `rates` in the [Table 
API](../tableApi.html#
 Line `(2)` registers this function under the name `Rates` in our table 
environment,
 which allows us to use the `Rates` function in [SQL](../sql.html#joins).
 
+## Temporal Table
+
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+
+In order to access data in temporal table, currently one must define a 
`TableSource` with `LookupableTableSource`. Flink uses the SQL syntax of `FOR 
SYSTEM_TIME AS OF` to query temporal table, which is proposed in SQL:2011.
+
+Assuming that we defined a temporal table called `LatestRates`, we can query 
such a table in the following way:
+
+{% highlight sql %}
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116
+Yen           1
+{% endhighlight %}
+
+**Note**: Currently, Flink doesn't support directly querying the temporal 
table with a constant time. At the moment, temporal table can only be used in 
joins. The example above is used to provide an intuition about what the 
temporal table `LatestRates` returns.
+
+See also the page about [joins for continuous queries](joins.html) for more 
information about how to join with a temporal table.
+
+### Defining Temporal Table
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Get the stream and table environments.
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+// Create an HBaseTableSource as a temporal table which implements 
LookableTableSource
+// In the real setup, you should replace this with your own table.
+HBaseTableSource rates = new HBaseTableSource(conf, "Rates");
+rates.setRowKey("currency", String.class);   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class);
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Get the stream and table environments.
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+// Create an HBaseTableSource as a temporal table which implements 
LookableTableSource
+// In the real setup, you should replace this with your own table.
+val rates = new HBaseTableSource(conf, "Rates")
+rates.setRowKey("currency", String.class)   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class)
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates)
+{% endhighlight %}
+</div>
+</div>
+
+See also the page about [how to define 
LookupableTableSource](../sourceSinks.html#defining-a-tablesource-for-lookups).
+
 {% top %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 36a9942..c677de6 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1061,7 +1061,7 @@ Table result = left.join(right)
     </tr>
     <tr>
        <td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
@@ -1082,7 +1082,7 @@ Table result = orders
     </tr>
     <tr>
        <td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
       <td>
@@ -1209,7 +1209,7 @@ val result = left.join(right)
     </tr>
     <tr>
        <td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
@@ -1228,7 +1228,7 @@ val result: Table = table
     </tr>
     <tr>
        <td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
        <td>
         <p>Joins a table with the results of a table function. Each row of the 
left (outer) table is joined with all rows produced by the corresponding call 
of the table function. If a table function call returns an empty result, the 
corresponding outer row is preserved and the result padded with null values.</p>
@@ -1332,7 +1332,7 @@ full_outer_result = left.full_outer_join(right, "a = 
d").select("a, b, e")
     </tr>
     <tr>
        <td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
@@ -1350,7 +1350,7 @@ result = orders.join_lateral("split(c).as(s, t, 
v)").select("a, b, s, t, v")
     </tr>
     <tr>
        <td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
       <td>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 28802a6..eec1edb 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -1060,7 +1060,7 @@ Table result = left.join(right)
     </tr>
     <tr>
        <td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
@@ -1081,7 +1081,7 @@ Table result = orders
     </tr>
     <tr>
        <td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
       <td>
@@ -1208,7 +1208,7 @@ val result = left.join(right)
     </tr>
     <tr>
        <td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
       </td>
        <td>
@@ -1227,7 +1227,7 @@ val result: Table = table
     </tr>
     <tr>
        <td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
        <td>
         <p>Joins a table with the results of a table function. Each row of the 
left (outer) table is joined with all rows produced by the corresponding call 
of the table function. If a table function call returns an empty result, the 
corresponding outer row is preserved and the result padded with null values.</p>
@@ -1331,7 +1331,7 @@ full_outer_result = left.full_outer_join(right, "a = 
d").select("a, b, e")
     </tr>
     <tr>
        <td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">批处理</span> <span class="label 
label-primary">流处理</span>
       </td>
        <td>
@@ -1349,7 +1349,7 @@ result = orders.join_lateral("split(c).as(s, t, 
v)").select("a, b, s, t, v")
     </tr>
     <tr>
        <td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">批处理</span> <span class="label 
label-primary">流处理</span>
       </td>
       <td>

Reply via email to