Hi all,

First of all, I appologize for the text wall that's following... ;-)

A temporal table join joins an append-only table and a temporal table.
The question about how to represent a temporal table join boils down to two
questions:

1) How to represent a temporal table
2) How to specify the join of an append-only table and a temporal table

I'll discuss these points separately.

# 1 How to represent a temporal table

A temporal table is a table that can be looked up with a time parameter and
which returns the rows of the table at that point in time / for that
version.
In order to be able to (conceptually) look up previous versions, a temporal
table must be (conceptually) backed by a history table that tracks all
previous versions (see SqlServer docs [1]).
In the context of our join, we added another restriction namely that the
table must have a primary key, i.e., there is only one row for each version
for each unique key.

Hence, the requirements for a temporal table are:
* The temporal table has a primary key / unique attribute
* The temporal table has a time-attribute that defines the start of the
validity interval of a row (processing time or event time)
* The system knows that the history of the table is tracked and can infer
how to look up a version.

There are two possible types of input from which we want to create temporal
tables (that I'm aware of):

* append-only tables, i.e., tables that contain the full change history
* retraction tables, i.e., tables that are updating and do not remember the
history.

There are a few ways to do this:

## 1.1 Defining a VIEW on an append-only table with a time attribute.

The following view definition results in a view that provides the latest
rate for each currency.

CREATE VIEW rates AS
SELECT
  currency, MAX(rate) as rate, MAX(rowtime) as rowtime
FROM rates_history rh1
WHERE
  rh1.rowtime = (
    SELECT max(rowtime)
    FROM rates_history rh2
    WHERE rh2.curreny = rh1.currency)
GROUP BY currency
WITH (
  'historytracking' = 'true',
  'historytracking.starttime' = 'rowtime');

However, we also need to tell the system to track the history of all
changes of the view in order to be able to look it up.
That's what the properties in the WITH clause are for (inspired by
SqlServer's TEMPORAL TABLE DDL syntax).
Note that this is *not* a syntax proposal but only meant to show which
information is needed.
This view allows to look up any version of the "rates" view.

In addition to designing and implementing the DDL syntax for views that
support temporal lookups, the optimizer would need to understand the
semantics of the view definition in depth.
Among other things it needs to understand that the MAX() aggregation on the
time-attribute preserves its watermark alignment.
AFAIK, this is not the case at the moment (the time attribute would be
converted into a regular TIMESTAMP and lose it's time attribute properties)

## 1.2 A retraction table with a primary key and a time-attribute.

On paper it looks like such a table would automatically qualify as a
time-versioned table because it completely fulfills the requirements.
However, I don't think we can use it *as is* as a temporal table if we want
to have clean semantics.
The problem here is the "lost history" of the retraction table. The dynamic
table that is defined on the retraction stream only stores the latest
version (even though it sees all versions).
Conceptually, a temporal table look up the version of the table at any
point in time because it is backed by a history table.
If this information is not available, we cannot have a semantically clean
definition of the join IMO.

Therefore we should define the table in a way that the system knows that
the history is tracked.
In MSSQL uses a syntax similar to this one

CREATE TABLE rates (
    currency CHAR(3) NOT NULL PRIMARY KEY,
    rate DOUBLE,
    rowtime TIMESTAMP,
    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
WITH (
  'historytracking' = 'true',
  'historytracking.starttime' = 'rowtime');

The 'historytracking' properties would decare that the table tracks its
history and also specify the attribute (rowtime) that is used for
versioning.

## 1.3 Registering a TableFunction that takes an append-only table with
time attribute

The TableFunction requires a few parameters:
* the source table from which to derive the temporal table
* the key attribute on which the versions of the source table should be
computed
* the time attribute that defines the versions
* a lookup timestamp for the version of that is returned.

The reason why we chose the TableFunction approach over the VIEW approach
so far were:
* It is easier for the optimizer to identify a build-in table function than
to analyze and reason about a generic VIEW.
* We would need to make the optimizer a lot smarter to infer all the
properties from the generic VIEW definition that we need for a temporal
table join.
* Passing a parameter to a function is a known thing, passing a parameter
to a VIEW not so much.
* Users would need to specify the VIEW exactly correct, such that it can be
used as a temporal table. Look at 1.1 why this is not trivial.

There is two ways to use a TableFunction:

### 1.3.1 Built-in and pre-registered function that is parameterized in the
SQL query

Here, we do not need to do anything to register the function. We simply use
it in the query (see example in 2.2 below)

### 1.3.2 Parameterize function when it is registered in the catalog (with
a provided Java implementation)

This is the approach, we've used so far. In the Table API, the function is
first parameterized and created and then registered:
We would need a DDL syntax to parameterize UDFs on registration.
I don't want to propose a syntax here, but just to get an idea it might
look like this:

CREATE FUNCTION rates AS
'org.apache.flink.table.udfs.TemporalTableFunction' WITH ('table' =
'rates_history', 'key' = 'cur', 'time' = 'rowtime')

Right now, the Flink Catalog interface does not have the functionality to
store such parameters and would need some hacks to properly create properly
parameterize function instances.



# 2 Defining a join of an append-only table and a temporal table

The append-only table needs to have a time-attribute (processing time or
event time, but same as the temporal table).
The join then needs to specify two things:
* an equality predicate that includes the primary key of the temporal table
* declare the time attribute of the append-only table as the time as of
which to look up the temporal table, i.e, get the version of the temporal
table that is valid for the timestamp of the current row from the
append-only table

The tricky part (from a syntax point of view) is to specify the lookup
time.

## 2.1 the temporal table is a regular table or view (see approaches 1.1
and 1.2 above)

In this case we can use the "FOR SYSTEM_TIME AS OF x" clause as follows:

SELECT *
FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
WHERE o.currency = r.currency

IMO, this is a great syntax and the one we should strive for.
We would need to bend the rules of the SQL standard which only allows x in
"FOR SYSTEM_TIME AS OF x" to be a constant and the table on which it is
applied usually needs to be a specific type (not sure if views are
supported), but I guess this is fine.
NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for LookupTable
Joins if x is a processing time attribute [2].

## 2.2 the temporal table is a TableFunction and parameterized in the query
(see 1.3.1 above)

SELECT *
FROM orders o,
  TEMPORAL_TABLE(
    table => TABLE(rates_history),
    key => DESCRIPTOR(currency),
    time => DESCRIPTOR(rowtime)) r
  ON o.currency = r.currency

The function "TEMPORAL_TABLE" is built-in and nothing was registered in the
catalog (except the rates_history table).
In fact this is valid SQL:2016 syntax and called Polymorphic Table
Functions. Have a look here [3].

## 2.3 the temporal table is a TableFunction that was parameterized during
registration (see 1.3.2 above)

This is what we have at the momement.

SELECT *
FROM orders o,
  LATERAL TABLE (rates(o.ordertime))
  ON o.currency = r.currency

The TableFunction "rates" was registered in the catalog and parameterized
to the "rates_history" append-only table, the key was set to "currency",
and the time attribute was declared.

# SUMMARY

IMO we should in the long run aim to define temporal tables either as
upsert retraction tables and views on append-only tables and join them
using the "FOR SYSTEM_TIME AS OF x" syntax.
I guess it is debatable whether we need to decare to track history for
these tables (which we don't actually do) or if we do it by convention if
the table has a time attribute.
It should be (relatively) easy to get this to work for retraction tables
which will be supported soon.
It will be more work for views because we need to improve the time
attribute handling with MAX() aggregations.
The "FOR SYSTEM_TIME AS OF x" is already supported for LookupTableSources
and would "only" need to be adapted to work on temporal tables.

Registering parameterized TableFunctions in the catalog seems like quite a
bit of work. We need new DDL syntax, extend the catalog and function
instantiation. This won't be easy, IMO.
If we only support them as TEMPORARY FUNCTION which are not registered in
the catalog it will be easier. The question is whether it is worth the
effort if we decide for the other approach.

Using TableFunctions that are parameterized in the query will require to
extend the Calcite parser and framework to support Polymorphic Table
Functions.
However, there might already some work be done there, because AFAIK Apache
Beam aims to support this syntax for windowing functions as described in
the "One SQL to rule them all" paper [4].
It might be the fastest and fully SQL standard compliant way.

Cheers,
Fabian

[1]
https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
[3]
https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip
[4] https://arxiv.org/abs/1905.12133

Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <imj...@gmail.com>:

> Hi Konstantin,
>
> Thanks for bringing this discussion. I think temporal join is a very
> important feature and should be exposed to pure SQL users.
> And I already received many requirements like this.
> However, my concern is that how to properly support this feature in SQL.
> Introducing a DDL syntax for Temporal Table Function is one way, but maybe
> not the best one.
>
> The most important reason is that the underlying of temporal table function
> is exactly a changelog stream.
> The temporal join is actually temporal joining a fact stream with the
> changelog stream on processing time or event time.
> We will soon support to create a changelog source using DDL once FLIP-95
> and FLIP-105 is finished.
> At that time, we can have a simple DDL to create changelog source like
> this;
>
> CREATE TABLE rate_changelog (
>   currency STRING,
>   rate DECIMAL
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'rate_binlog',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'format' = 'debezium-json'
> );
>
> In the meanwhile, we already have a SQL standard temporal join syntax [1],
> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
> It is currently used as dimension table lookup join, but the semantic is
> the same to the "temporal table function join"[2].
> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more nature
> becuase the definition of B is a *table* not a *table function*,
> and the syntax is included in SQL standard.
>
> So once we have the ability to define "rate_changelog" table, then we can
> use the following query to temporal join the changelog on processing time.
>
> SELECT *
> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF orders.proctime
> ON orders.currency = rate_changelog.currency;
>
> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can easily to support
> "temporal join on changelogs" without introducing new syntax.
> IMO, introducing a DDL syntax for Temporal Table Function looks like not an
> easy way and may have repetitive work.
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> [2]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>
>
>
>
>
> On Thu, 16 Apr 2020 at 23:04, Benchao Li <libenc...@gmail.com> wrote:
>
> > Hi Konstantin,
> >
> > Thanks for bringing up this discussion. +1 for the idea.
> > We have met this in our company too, and I planned to support it recently
> > in our internal branch.
> >
> > regarding to your questions,
> > 1) I think it might be more a table/view than function, just like
> Temporal
> > Table (which is also known as
> > dimension table). Maybe we need a DDL like CREATE VIEW and plus some
> > additional settings.
> > 2) If we design the DDL for it like view, then maybe temporary is ok
> > enough.
> >
> > Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 下午8:16写道:
> >
> > > Hi everyone,
> > >
> > > it would be very useful if temporal tables could be created  via DDL.
> > > Currently, users either need to do this in the Table API or in the
> > > environment file of the Flink CLI, which both require the user to
> switch
> > > the context of the SQL CLI/Editor. I recently created a ticket for this
> > > request [1].
> > >
> > > I see two main questions:
> > >
> > > 1) What would be the DDL syntax? A Temporal Table is on the one hand a
> > view
> > > and on the other a function depending on how you look at it.
> > >
> > > 2) Would this temporal table view/function be stored in the catalog or
> > only
> > > be temporary?
> > >
> > > I personally do not have much experience in this area of Flink, so I am
> > > looking forward to hearing your thoughts on this.
> > >
> > > Best,
> > >
> > > Konstantin
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-16824
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>

Reply via email to