I really like the TEMPORAL keyword, I find it very intuitive.

The down side of this approach would be that an additional preprocessing
> step would not be possible anymore because there is no preceding view.
>

 Yes and no. My understanding is we are not talking about making any
changes to how temporal tables are defined in the table api. Since you
cannot currently define temporal table functions in pure SQL applications,
but only pre-register them in YAML, you can't do any pre-processing as it
stands today. Preprocessing may be a generally useful feature, I'm not
sure, but this syntax does not lose us anything in pure SQL applications.

These semantics are still unclear to me. Can retractions only occur
> within watermarks? Or are they also used for representing late updates?
>

I do not know the SQL standard well enough to give a principled response to
this question. However, in my observation of production workloads, users of
temporal table functions are doing so to denormalize star schemas before
performing further transformations and aggregations and expect the output
to be an append stream. With the ongoing work to better support changelogs,
the need for users to understand the differences in append vs upsert in
their query may be diminishing but everyone else on this thread can better
speak to that.

Seth

On Fri, Apr 17, 2020 at 10:03 AM Timo Walther <twal...@apache.org> wrote:

> Hi Fabian,
>
> thank you very much for this great summary!
>
> I wasn't aware of the Polymorphic Table Functions standard. This is a
> very interesting topic that we should definitely consider in the future.
> Maybe this could also help us in defining tables more dynamically within
> a query. It could help solving problems as discussed in FLIP-113.
>
> Regarding joining:
>
> IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of the current
> `LATERAL TABLE(rates(x))` syntax. A function that also behaves like a
> table and needs this special `LATERAL` keyword during joining is not
> very intuitive. The PTF could be used once they are fully supported by
> Calcite and we have the big picture how to also use them for other
> time-based operations (windows?, joins?).
>
> Regarding how represent a temporal table:
>
> I think that our current DDL, current LookupTableSource and temporal
> tables can fit nicely together.
>
> How about we simply introduce an additional keyword `TEMPORAL` to
> indicate history tracking semantics? I think this is the minimal
> invasive solution:
>
> CREATE TEMPORAL TABLE rates (
>    currency CHAR(3) NOT NULL PRIMARY KEY,
>    rate DOUBLE,
>    rowtime TIMESTAMP,
>    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> WITH (...);
>
> - The primary key would be defined by the DDL.
> - The available time attribute would be defined by the DDL. Either as
> the only time attribute of the table or we introduce a special
> constraint similar to `PRIMARY KEY`.
>
> The down side of this approach would be that an additional preprocessing
> step would not be possible anymore because there is no preceding view.
>
> The `TEMPORAL` semantic can be stored in the properties of the table
> when writing to a catalog. We do the same for watermarks and computed
> columns.
>
> Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x` would only
> work on processing time by a lookup into the external system or on
> event-time by using the time semantics that the external system supports.
>
> Regarding retraction table with a primary key and a time-attribute:
>
> These semantics are still unclear to me. Can retractions only occur
> within watermarks? Or are they also used for representing late updates?
>
> Regards,
> Timo
>
>
> On 17.04.20 14:34, Fabian Hueske wrote:
> > Hi all,
> >
> > First of all, I appologize for the text wall that's following... ;-)
> >
> > A temporal table join joins an append-only table and a temporal table.
> > The question about how to represent a temporal table join boils down to
> two
> > questions:
> >
> > 1) How to represent a temporal table
> > 2) How to specify the join of an append-only table and a temporal table
> >
> > I'll discuss these points separately.
> >
> > # 1 How to represent a temporal table
> >
> > A temporal table is a table that can be looked up with a time parameter
> and
> > which returns the rows of the table at that point in time / for that
> > version.
> > In order to be able to (conceptually) look up previous versions, a
> temporal
> > table must be (conceptually) backed by a history table that tracks all
> > previous versions (see SqlServer docs [1]).
> > In the context of our join, we added another restriction namely that the
> > table must have a primary key, i.e., there is only one row for each
> version
> > for each unique key.
> >
> > Hence, the requirements for a temporal table are:
> > * The temporal table has a primary key / unique attribute
> > * The temporal table has a time-attribute that defines the start of the
> > validity interval of a row (processing time or event time)
> > * The system knows that the history of the table is tracked and can infer
> > how to look up a version.
> >
> > There are two possible types of input from which we want to create
> temporal
> > tables (that I'm aware of):
> >
> > * append-only tables, i.e., tables that contain the full change history
> > * retraction tables, i.e., tables that are updating and do not remember
> the
> > history.
> >
> > There are a few ways to do this:
> >
> > ## 1.1 Defining a VIEW on an append-only table with a time attribute.
> >
> > The following view definition results in a view that provides the latest
> > rate for each currency.
> >
> > CREATE VIEW rates AS
> > SELECT
> >    currency, MAX(rate) as rate, MAX(rowtime) as rowtime
> > FROM rates_history rh1
> > WHERE
> >    rh1.rowtime = (
> >      SELECT max(rowtime)
> >      FROM rates_history rh2
> >      WHERE rh2.curreny = rh1.currency)
> > GROUP BY currency
> > WITH (
> >    'historytracking' = 'true',
> >    'historytracking.starttime' = 'rowtime');
> >
> > However, we also need to tell the system to track the history of all
> > changes of the view in order to be able to look it up.
> > That's what the properties in the WITH clause are for (inspired by
> > SqlServer's TEMPORAL TABLE DDL syntax).
> > Note that this is *not* a syntax proposal but only meant to show which
> > information is needed.
> > This view allows to look up any version of the "rates" view.
> >
> > In addition to designing and implementing the DDL syntax for views that
> > support temporal lookups, the optimizer would need to understand the
> > semantics of the view definition in depth.
> > Among other things it needs to understand that the MAX() aggregation on
> the
> > time-attribute preserves its watermark alignment.
> > AFAIK, this is not the case at the moment (the time attribute would be
> > converted into a regular TIMESTAMP and lose it's time attribute
> properties)
> >
> > ## 1.2 A retraction table with a primary key and a time-attribute.
> >
> > On paper it looks like such a table would automatically qualify as a
> > time-versioned table because it completely fulfills the requirements.
> > However, I don't think we can use it *as is* as a temporal table if we
> want
> > to have clean semantics.
> > The problem here is the "lost history" of the retraction table. The
> dynamic
> > table that is defined on the retraction stream only stores the latest
> > version (even though it sees all versions).
> > Conceptually, a temporal table look up the version of the table at any
> > point in time because it is backed by a history table.
> > If this information is not available, we cannot have a semantically clean
> > definition of the join IMO.
> >
> > Therefore we should define the table in a way that the system knows that
> > the history is tracked.
> > In MSSQL uses a syntax similar to this one
> >
> > CREATE TABLE rates (
> >      currency CHAR(3) NOT NULL PRIMARY KEY,
> >      rate DOUBLE,
> >      rowtime TIMESTAMP,
> >      WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > WITH (
> >    'historytracking' = 'true',
> >    'historytracking.starttime' = 'rowtime');
> >
> > The 'historytracking' properties would decare that the table tracks its
> > history and also specify the attribute (rowtime) that is used for
> > versioning.
> >
> > ## 1.3 Registering a TableFunction that takes an append-only table with
> > time attribute
> >
> > The TableFunction requires a few parameters:
> > * the source table from which to derive the temporal table
> > * the key attribute on which the versions of the source table should be
> > computed
> > * the time attribute that defines the versions
> > * a lookup timestamp for the version of that is returned.
> >
> > The reason why we chose the TableFunction approach over the VIEW approach
> > so far were:
> > * It is easier for the optimizer to identify a build-in table function
> than
> > to analyze and reason about a generic VIEW.
> > * We would need to make the optimizer a lot smarter to infer all the
> > properties from the generic VIEW definition that we need for a temporal
> > table join.
> > * Passing a parameter to a function is a known thing, passing a parameter
> > to a VIEW not so much.
> > * Users would need to specify the VIEW exactly correct, such that it can
> be
> > used as a temporal table. Look at 1.1 why this is not trivial.
> >
> > There is two ways to use a TableFunction:
> >
> > ### 1.3.1 Built-in and pre-registered function that is parameterized in
> the
> > SQL query
> >
> > Here, we do not need to do anything to register the function. We simply
> use
> > it in the query (see example in 2.2 below)
> >
> > ### 1.3.2 Parameterize function when it is registered in the catalog
> (with
> > a provided Java implementation)
> >
> > This is the approach, we've used so far. In the Table API, the function
> is
> > first parameterized and created and then registered:
> > We would need a DDL syntax to parameterize UDFs on registration.
> > I don't want to propose a syntax here, but just to get an idea it might
> > look like this:
> >
> > CREATE FUNCTION rates AS
> > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH ('table' =
> > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
> >
> > Right now, the Flink Catalog interface does not have the functionality to
> > store such parameters and would need some hacks to properly create
> properly
> > parameterize function instances.
> >
> >
> >
> > # 2 Defining a join of an append-only table and a temporal table
> >
> > The append-only table needs to have a time-attribute (processing time or
> > event time, but same as the temporal table).
> > The join then needs to specify two things:
> > * an equality predicate that includes the primary key of the temporal
> table
> > * declare the time attribute of the append-only table as the time as of
> > which to look up the temporal table, i.e, get the version of the temporal
> > table that is valid for the timestamp of the current row from the
> > append-only table
> >
> > The tricky part (from a syntax point of view) is to specify the lookup
> > time.
> >
> > ## 2.1 the temporal table is a regular table or view (see approaches 1.1
> > and 1.2 above)
> >
> > In this case we can use the "FOR SYSTEM_TIME AS OF x" clause as follows:
> >
> > SELECT *
> > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> > WHERE o.currency = r.currency
> >
> > IMO, this is a great syntax and the one we should strive for.
> > We would need to bend the rules of the SQL standard which only allows x
> in
> > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on which it is
> > applied usually needs to be a specific type (not sure if views are
> > supported), but I guess this is fine.
> > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for LookupTable
> > Joins if x is a processing time attribute [2].
> >
> > ## 2.2 the temporal table is a TableFunction and parameterized in the
> query
> > (see 1.3.1 above)
> >
> > SELECT *
> > FROM orders o,
> >    TEMPORAL_TABLE(
> >      table => TABLE(rates_history),
> >      key => DESCRIPTOR(currency),
> >      time => DESCRIPTOR(rowtime)) r
> >    ON o.currency = r.currency
> >
> > The function "TEMPORAL_TABLE" is built-in and nothing was registered in
> the
> > catalog (except the rates_history table).
> > In fact this is valid SQL:2016 syntax and called Polymorphic Table
> > Functions. Have a look here [3].
> >
> > ## 2.3 the temporal table is a TableFunction that was parameterized
> during
> > registration (see 1.3.2 above)
> >
> > This is what we have at the momement.
> >
> > SELECT *
> > FROM orders o,
> >    LATERAL TABLE (rates(o.ordertime))
> >    ON o.currency = r.currency
> >
> > The TableFunction "rates" was registered in the catalog and parameterized
> > to the "rates_history" append-only table, the key was set to "currency",
> > and the time attribute was declared.
> >
> > # SUMMARY
> >
> > IMO we should in the long run aim to define temporal tables either as
> > upsert retraction tables and views on append-only tables and join them
> > using the "FOR SYSTEM_TIME AS OF x" syntax.
> > I guess it is debatable whether we need to decare to track history for
> > these tables (which we don't actually do) or if we do it by convention if
> > the table has a time attribute.
> > It should be (relatively) easy to get this to work for retraction tables
> > which will be supported soon.
> > It will be more work for views because we need to improve the time
> > attribute handling with MAX() aggregations.
> > The "FOR SYSTEM_TIME AS OF x" is already supported for LookupTableSources
> > and would "only" need to be adapted to work on temporal tables.
> >
> > Registering parameterized TableFunctions in the catalog seems like quite
> a
> > bit of work. We need new DDL syntax, extend the catalog and function
> > instantiation. This won't be easy, IMO.
> > If we only support them as TEMPORARY FUNCTION which are not registered in
> > the catalog it will be easier. The question is whether it is worth the
> > effort if we decide for the other approach.
> >
> > Using TableFunctions that are parameterized in the query will require to
> > extend the Calcite parser and framework to support Polymorphic Table
> > Functions.
> > However, there might already some work be done there, because AFAIK
> Apache
> > Beam aims to support this syntax for windowing functions as described in
> > the "One SQL to rule them all" paper [4].
> > It might be the fastest and fully SQL standard compliant way.
> >
> > Cheers,
> > Fabian
> >
> > [1]
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
> > [3]
> >
> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip
> > [4] https://arxiv.org/abs/1905.12133
> >
> > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <imj...@gmail.com>:
> >
> >> Hi Konstantin,
> >>
> >> Thanks for bringing this discussion. I think temporal join is a very
> >> important feature and should be exposed to pure SQL users.
> >> And I already received many requirements like this.
> >> However, my concern is that how to properly support this feature in SQL.
> >> Introducing a DDL syntax for Temporal Table Function is one way, but
> maybe
> >> not the best one.
> >>
> >> The most important reason is that the underlying of temporal table
> function
> >> is exactly a changelog stream.
> >> The temporal join is actually temporal joining a fact stream with the
> >> changelog stream on processing time or event time.
> >> We will soon support to create a changelog source using DDL once FLIP-95
> >> and FLIP-105 is finished.
> >> At that time, we can have a simple DDL to create changelog source like
> >> this;
> >>
> >> CREATE TABLE rate_changelog (
> >>    currency STRING,
> >>    rate DECIMAL
> >> ) WITH (
> >>    'connector' = 'kafka',
> >>    'topic' = 'rate_binlog',
> >>    'properties.bootstrap.servers' = 'localhost:9092',
> >>    'format' = 'debezium-json'
> >> );
> >>
> >> In the meanwhile, we already have a SQL standard temporal join syntax
> [1],
> >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
> >> It is currently used as dimension table lookup join, but the semantic is
> >> the same to the "temporal table function join"[2].
> >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more nature
> >> becuase the definition of B is a *table* not a *table function*,
> >> and the syntax is included in SQL standard.
> >>
> >> So once we have the ability to define "rate_changelog" table, then we
> can
> >> use the following query to temporal join the changelog on processing
> time.
> >>
> >> SELECT *
> >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF orders.proctime
> >> ON orders.currency = rate_changelog.currency;
> >>
> >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can easily to
> support
> >> "temporal join on changelogs" without introducing new syntax.
> >> IMO, introducing a DDL syntax for Temporal Table Function looks like
> not an
> >> easy way and may have repetitive work.
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> >> [2]:
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> >>
> >>
> >>
> >>
> >>
> >> On Thu, 16 Apr 2020 at 23:04, Benchao Li <libenc...@gmail.com> wrote:
> >>
> >>> Hi Konstantin,
> >>>
> >>> Thanks for bringing up this discussion. +1 for the idea.
> >>> We have met this in our company too, and I planned to support it
> recently
> >>> in our internal branch.
> >>>
> >>> regarding to your questions,
> >>> 1) I think it might be more a table/view than function, just like
> >> Temporal
> >>> Table (which is also known as
> >>> dimension table). Maybe we need a DDL like CREATE VIEW and plus some
> >>> additional settings.
> >>> 2) If we design the DDL for it like view, then maybe temporary is ok
> >>> enough.
> >>>
> >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 下午8:16写道:
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> it would be very useful if temporal tables could be created  via DDL.
> >>>> Currently, users either need to do this in the Table API or in the
> >>>> environment file of the Flink CLI, which both require the user to
> >> switch
> >>>> the context of the SQL CLI/Editor. I recently created a ticket for
> this
> >>>> request [1].
> >>>>
> >>>> I see two main questions:
> >>>>
> >>>> 1) What would be the DDL syntax? A Temporal Table is on the one hand a
> >>> view
> >>>> and on the other a function depending on how you look at it.
> >>>>
> >>>> 2) Would this temporal table view/function be stored in the catalog or
> >>> only
> >>>> be temporary?
> >>>>
> >>>> I personally do not have much experience in this area of Flink, so I
> am
> >>>> looking forward to hearing your thoughts on this.
> >>>>
> >>>> Best,
> >>>>
> >>>> Konstantin
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824
> >>>>
> >>>> --
> >>>>
> >>>> Konstantin Knauf
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Benchao Li
> >>> School of Electronics Engineering and Computer Science, Peking
> University
> >>> Tel:+86-15650713730
> >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >>>
> >>
> >
>
>

Reply via email to