Hi,

I agree with most of what Timo said.

The TEMPORAL keyword (which unfortunately might be easily confused with
TEMPORARY...) looks very intuitive and I think using the only time
attribute for versioning would be a good choice.

However, TEMPORAL TABLE on retraction tables do not solve the full problem.
I believe there will be also cases where we need to derive a temporal table
from an append only table (what TemporalTableFunctions do right now).
I think the best choice for this would be TEMPORAL VIEW but as I explained,
it might be a longer way until this can be supported.
TEMPORAL VIEW would also address the problem of preprocessing.

> Regarding retraction table with a primary key and a time-attribute:
> These semantics are still unclear to me. Can retractions only occur
> within watermarks? Or are they also used for representing late updates?

Time attributes and retraction streams are a challenging topic that I
haven't completely understood yet.
So far we treated time attributes always as part of the data.
In combination with retractions, it seems that they become metadata that
specifies when a change was done.
I think this is different from treating time attributes as regular data.

Cheers, Fabian


Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman <sjwies...@gmail.com
>:

> I really like the TEMPORAL keyword, I find it very intuitive.
>
> The down side of this approach would be that an additional preprocessing
> > step would not be possible anymore because there is no preceding view.
> >
>
>  Yes and no. My understanding is we are not talking about making any
> changes to how temporal tables are defined in the table api. Since you
> cannot currently define temporal table functions in pure SQL applications,
> but only pre-register them in YAML, you can't do any pre-processing as it
> stands today. Preprocessing may be a generally useful feature, I'm not
> sure, but this syntax does not lose us anything in pure SQL applications.
>
> These semantics are still unclear to me. Can retractions only occur
> > within watermarks? Or are they also used for representing late updates?
> >
>
> I do not know the SQL standard well enough to give a principled response to
> this question. However, in my observation of production workloads, users of
> temporal table functions are doing so to denormalize star schemas before
> performing further transformations and aggregations and expect the output
> to be an append stream. With the ongoing work to better support changelogs,
> the need for users to understand the differences in append vs upsert in
> their query may be diminishing but everyone else on this thread can better
> speak to that.
>
> Seth
>
> On Fri, Apr 17, 2020 at 10:03 AM Timo Walther <twal...@apache.org> wrote:
>
> > Hi Fabian,
> >
> > thank you very much for this great summary!
> >
> > I wasn't aware of the Polymorphic Table Functions standard. This is a
> > very interesting topic that we should definitely consider in the future.
> > Maybe this could also help us in defining tables more dynamically within
> > a query. It could help solving problems as discussed in FLIP-113.
> >
> > Regarding joining:
> >
> > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of the current
> > `LATERAL TABLE(rates(x))` syntax. A function that also behaves like a
> > table and needs this special `LATERAL` keyword during joining is not
> > very intuitive. The PTF could be used once they are fully supported by
> > Calcite and we have the big picture how to also use them for other
> > time-based operations (windows?, joins?).
> >
> > Regarding how represent a temporal table:
> >
> > I think that our current DDL, current LookupTableSource and temporal
> > tables can fit nicely together.
> >
> > How about we simply introduce an additional keyword `TEMPORAL` to
> > indicate history tracking semantics? I think this is the minimal
> > invasive solution:
> >
> > CREATE TEMPORAL TABLE rates (
> >    currency CHAR(3) NOT NULL PRIMARY KEY,
> >    rate DOUBLE,
> >    rowtime TIMESTAMP,
> >    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > WITH (...);
> >
> > - The primary key would be defined by the DDL.
> > - The available time attribute would be defined by the DDL. Either as
> > the only time attribute of the table or we introduce a special
> > constraint similar to `PRIMARY KEY`.
> >
> > The down side of this approach would be that an additional preprocessing
> > step would not be possible anymore because there is no preceding view.
> >
> > The `TEMPORAL` semantic can be stored in the properties of the table
> > when writing to a catalog. We do the same for watermarks and computed
> > columns.
> >
> > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x` would only
> > work on processing time by a lookup into the external system or on
> > event-time by using the time semantics that the external system supports.
> >
> > Regarding retraction table with a primary key and a time-attribute:
> >
> > These semantics are still unclear to me. Can retractions only occur
> > within watermarks? Or are they also used for representing late updates?
> >
> > Regards,
> > Timo
> >
> >
> > On 17.04.20 14:34, Fabian Hueske wrote:
> > > Hi all,
> > >
> > > First of all, I appologize for the text wall that's following... ;-)
> > >
> > > A temporal table join joins an append-only table and a temporal table.
> > > The question about how to represent a temporal table join boils down to
> > two
> > > questions:
> > >
> > > 1) How to represent a temporal table
> > > 2) How to specify the join of an append-only table and a temporal table
> > >
> > > I'll discuss these points separately.
> > >
> > > # 1 How to represent a temporal table
> > >
> > > A temporal table is a table that can be looked up with a time parameter
> > and
> > > which returns the rows of the table at that point in time / for that
> > > version.
> > > In order to be able to (conceptually) look up previous versions, a
> > temporal
> > > table must be (conceptually) backed by a history table that tracks all
> > > previous versions (see SqlServer docs [1]).
> > > In the context of our join, we added another restriction namely that
> the
> > > table must have a primary key, i.e., there is only one row for each
> > version
> > > for each unique key.
> > >
> > > Hence, the requirements for a temporal table are:
> > > * The temporal table has a primary key / unique attribute
> > > * The temporal table has a time-attribute that defines the start of the
> > > validity interval of a row (processing time or event time)
> > > * The system knows that the history of the table is tracked and can
> infer
> > > how to look up a version.
> > >
> > > There are two possible types of input from which we want to create
> > temporal
> > > tables (that I'm aware of):
> > >
> > > * append-only tables, i.e., tables that contain the full change history
> > > * retraction tables, i.e., tables that are updating and do not remember
> > the
> > > history.
> > >
> > > There are a few ways to do this:
> > >
> > > ## 1.1 Defining a VIEW on an append-only table with a time attribute.
> > >
> > > The following view definition results in a view that provides the
> latest
> > > rate for each currency.
> > >
> > > CREATE VIEW rates AS
> > > SELECT
> > >    currency, MAX(rate) as rate, MAX(rowtime) as rowtime
> > > FROM rates_history rh1
> > > WHERE
> > >    rh1.rowtime = (
> > >      SELECT max(rowtime)
> > >      FROM rates_history rh2
> > >      WHERE rh2.curreny = rh1.currency)
> > > GROUP BY currency
> > > WITH (
> > >    'historytracking' = 'true',
> > >    'historytracking.starttime' = 'rowtime');
> > >
> > > However, we also need to tell the system to track the history of all
> > > changes of the view in order to be able to look it up.
> > > That's what the properties in the WITH clause are for (inspired by
> > > SqlServer's TEMPORAL TABLE DDL syntax).
> > > Note that this is *not* a syntax proposal but only meant to show which
> > > information is needed.
> > > This view allows to look up any version of the "rates" view.
> > >
> > > In addition to designing and implementing the DDL syntax for views that
> > > support temporal lookups, the optimizer would need to understand the
> > > semantics of the view definition in depth.
> > > Among other things it needs to understand that the MAX() aggregation on
> > the
> > > time-attribute preserves its watermark alignment.
> > > AFAIK, this is not the case at the moment (the time attribute would be
> > > converted into a regular TIMESTAMP and lose it's time attribute
> > properties)
> > >
> > > ## 1.2 A retraction table with a primary key and a time-attribute.
> > >
> > > On paper it looks like such a table would automatically qualify as a
> > > time-versioned table because it completely fulfills the requirements.
> > > However, I don't think we can use it *as is* as a temporal table if we
> > want
> > > to have clean semantics.
> > > The problem here is the "lost history" of the retraction table. The
> > dynamic
> > > table that is defined on the retraction stream only stores the latest
> > > version (even though it sees all versions).
> > > Conceptually, a temporal table look up the version of the table at any
> > > point in time because it is backed by a history table.
> > > If this information is not available, we cannot have a semantically
> clean
> > > definition of the join IMO.
> > >
> > > Therefore we should define the table in a way that the system knows
> that
> > > the history is tracked.
> > > In MSSQL uses a syntax similar to this one
> > >
> > > CREATE TABLE rates (
> > >      currency CHAR(3) NOT NULL PRIMARY KEY,
> > >      rate DOUBLE,
> > >      rowtime TIMESTAMP,
> > >      WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > > WITH (
> > >    'historytracking' = 'true',
> > >    'historytracking.starttime' = 'rowtime');
> > >
> > > The 'historytracking' properties would decare that the table tracks its
> > > history and also specify the attribute (rowtime) that is used for
> > > versioning.
> > >
> > > ## 1.3 Registering a TableFunction that takes an append-only table with
> > > time attribute
> > >
> > > The TableFunction requires a few parameters:
> > > * the source table from which to derive the temporal table
> > > * the key attribute on which the versions of the source table should be
> > > computed
> > > * the time attribute that defines the versions
> > > * a lookup timestamp for the version of that is returned.
> > >
> > > The reason why we chose the TableFunction approach over the VIEW
> approach
> > > so far were:
> > > * It is easier for the optimizer to identify a build-in table function
> > than
> > > to analyze and reason about a generic VIEW.
> > > * We would need to make the optimizer a lot smarter to infer all the
> > > properties from the generic VIEW definition that we need for a temporal
> > > table join.
> > > * Passing a parameter to a function is a known thing, passing a
> parameter
> > > to a VIEW not so much.
> > > * Users would need to specify the VIEW exactly correct, such that it
> can
> > be
> > > used as a temporal table. Look at 1.1 why this is not trivial.
> > >
> > > There is two ways to use a TableFunction:
> > >
> > > ### 1.3.1 Built-in and pre-registered function that is parameterized in
> > the
> > > SQL query
> > >
> > > Here, we do not need to do anything to register the function. We simply
> > use
> > > it in the query (see example in 2.2 below)
> > >
> > > ### 1.3.2 Parameterize function when it is registered in the catalog
> > (with
> > > a provided Java implementation)
> > >
> > > This is the approach, we've used so far. In the Table API, the function
> > is
> > > first parameterized and created and then registered:
> > > We would need a DDL syntax to parameterize UDFs on registration.
> > > I don't want to propose a syntax here, but just to get an idea it might
> > > look like this:
> > >
> > > CREATE FUNCTION rates AS
> > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH ('table' =
> > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
> > >
> > > Right now, the Flink Catalog interface does not have the functionality
> to
> > > store such parameters and would need some hacks to properly create
> > properly
> > > parameterize function instances.
> > >
> > >
> > >
> > > # 2 Defining a join of an append-only table and a temporal table
> > >
> > > The append-only table needs to have a time-attribute (processing time
> or
> > > event time, but same as the temporal table).
> > > The join then needs to specify two things:
> > > * an equality predicate that includes the primary key of the temporal
> > table
> > > * declare the time attribute of the append-only table as the time as of
> > > which to look up the temporal table, i.e, get the version of the
> temporal
> > > table that is valid for the timestamp of the current row from the
> > > append-only table
> > >
> > > The tricky part (from a syntax point of view) is to specify the lookup
> > > time.
> > >
> > > ## 2.1 the temporal table is a regular table or view (see approaches
> 1.1
> > > and 1.2 above)
> > >
> > > In this case we can use the "FOR SYSTEM_TIME AS OF x" clause as
> follows:
> > >
> > > SELECT *
> > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> > > WHERE o.currency = r.currency
> > >
> > > IMO, this is a great syntax and the one we should strive for.
> > > We would need to bend the rules of the SQL standard which only allows x
> > in
> > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on which it is
> > > applied usually needs to be a specific type (not sure if views are
> > > supported), but I guess this is fine.
> > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for
> LookupTable
> > > Joins if x is a processing time attribute [2].
> > >
> > > ## 2.2 the temporal table is a TableFunction and parameterized in the
> > query
> > > (see 1.3.1 above)
> > >
> > > SELECT *
> > > FROM orders o,
> > >    TEMPORAL_TABLE(
> > >      table => TABLE(rates_history),
> > >      key => DESCRIPTOR(currency),
> > >      time => DESCRIPTOR(rowtime)) r
> > >    ON o.currency = r.currency
> > >
> > > The function "TEMPORAL_TABLE" is built-in and nothing was registered in
> > the
> > > catalog (except the rates_history table).
> > > In fact this is valid SQL:2016 syntax and called Polymorphic Table
> > > Functions. Have a look here [3].
> > >
> > > ## 2.3 the temporal table is a TableFunction that was parameterized
> > during
> > > registration (see 1.3.2 above)
> > >
> > > This is what we have at the momement.
> > >
> > > SELECT *
> > > FROM orders o,
> > >    LATERAL TABLE (rates(o.ordertime))
> > >    ON o.currency = r.currency
> > >
> > > The TableFunction "rates" was registered in the catalog and
> parameterized
> > > to the "rates_history" append-only table, the key was set to
> "currency",
> > > and the time attribute was declared.
> > >
> > > # SUMMARY
> > >
> > > IMO we should in the long run aim to define temporal tables either as
> > > upsert retraction tables and views on append-only tables and join them
> > > using the "FOR SYSTEM_TIME AS OF x" syntax.
> > > I guess it is debatable whether we need to decare to track history for
> > > these tables (which we don't actually do) or if we do it by convention
> if
> > > the table has a time attribute.
> > > It should be (relatively) easy to get this to work for retraction
> tables
> > > which will be supported soon.
> > > It will be more work for views because we need to improve the time
> > > attribute handling with MAX() aggregations.
> > > The "FOR SYSTEM_TIME AS OF x" is already supported for
> LookupTableSources
> > > and would "only" need to be adapted to work on temporal tables.
> > >
> > > Registering parameterized TableFunctions in the catalog seems like
> quite
> > a
> > > bit of work. We need new DDL syntax, extend the catalog and function
> > > instantiation. This won't be easy, IMO.
> > > If we only support them as TEMPORARY FUNCTION which are not registered
> in
> > > the catalog it will be easier. The question is whether it is worth the
> > > effort if we decide for the other approach.
> > >
> > > Using TableFunctions that are parameterized in the query will require
> to
> > > extend the Calcite parser and framework to support Polymorphic Table
> > > Functions.
> > > However, there might already some work be done there, because AFAIK
> > Apache
> > > Beam aims to support this syntax for windowing functions as described
> in
> > > the "One SQL to rule them all" paper [4].
> > > It might be the fastest and fully SQL standard compliant way.
> > >
> > > Cheers,
> > > Fabian
> > >
> > > [1]
> > >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables
> > > [2]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
> > > [3]
> > >
> >
> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip
> > > [4] https://arxiv.org/abs/1905.12133
> > >
> > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <imj...@gmail.com>:
> > >
> > >> Hi Konstantin,
> > >>
> > >> Thanks for bringing this discussion. I think temporal join is a very
> > >> important feature and should be exposed to pure SQL users.
> > >> And I already received many requirements like this.
> > >> However, my concern is that how to properly support this feature in
> SQL.
> > >> Introducing a DDL syntax for Temporal Table Function is one way, but
> > maybe
> > >> not the best one.
> > >>
> > >> The most important reason is that the underlying of temporal table
> > function
> > >> is exactly a changelog stream.
> > >> The temporal join is actually temporal joining a fact stream with the
> > >> changelog stream on processing time or event time.
> > >> We will soon support to create a changelog source using DDL once
> FLIP-95
> > >> and FLIP-105 is finished.
> > >> At that time, we can have a simple DDL to create changelog source like
> > >> this;
> > >>
> > >> CREATE TABLE rate_changelog (
> > >>    currency STRING,
> > >>    rate DECIMAL
> > >> ) WITH (
> > >>    'connector' = 'kafka',
> > >>    'topic' = 'rate_binlog',
> > >>    'properties.bootstrap.servers' = 'localhost:9092',
> > >>    'format' = 'debezium-json'
> > >> );
> > >>
> > >> In the meanwhile, we already have a SQL standard temporal join syntax
> > [1],
> > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
> > >> It is currently used as dimension table lookup join, but the semantic
> is
> > >> the same to the "temporal table function join"[2].
> > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more nature
> > >> becuase the definition of B is a *table* not a *table function*,
> > >> and the syntax is included in SQL standard.
> > >>
> > >> So once we have the ability to define "rate_changelog" table, then we
> > can
> > >> use the following query to temporal join the changelog on processing
> > time.
> > >>
> > >> SELECT *
> > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF orders.proctime
> > >> ON orders.currency = rate_changelog.currency;
> > >>
> > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can easily to
> > support
> > >> "temporal join on changelogs" without introducing new syntax.
> > >> IMO, introducing a DDL syntax for Temporal Table Function looks like
> > not an
> > >> easy way and may have repetitive work.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> [1]:
> > >>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> > >> [2]:
> > >>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li <libenc...@gmail.com> wrote:
> > >>
> > >>> Hi Konstantin,
> > >>>
> > >>> Thanks for bringing up this discussion. +1 for the idea.
> > >>> We have met this in our company too, and I planned to support it
> > recently
> > >>> in our internal branch.
> > >>>
> > >>> regarding to your questions,
> > >>> 1) I think it might be more a table/view than function, just like
> > >> Temporal
> > >>> Table (which is also known as
> > >>> dimension table). Maybe we need a DDL like CREATE VIEW and plus some
> > >>> additional settings.
> > >>> 2) If we design the DDL for it like view, then maybe temporary is ok
> > >>> enough.
> > >>>
> > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 下午8:16写道:
> > >>>
> > >>>> Hi everyone,
> > >>>>
> > >>>> it would be very useful if temporal tables could be created  via
> DDL.
> > >>>> Currently, users either need to do this in the Table API or in the
> > >>>> environment file of the Flink CLI, which both require the user to
> > >> switch
> > >>>> the context of the SQL CLI/Editor. I recently created a ticket for
> > this
> > >>>> request [1].
> > >>>>
> > >>>> I see two main questions:
> > >>>>
> > >>>> 1) What would be the DDL syntax? A Temporal Table is on the one
> hand a
> > >>> view
> > >>>> and on the other a function depending on how you look at it.
> > >>>>
> > >>>> 2) Would this temporal table view/function be stored in the catalog
> or
> > >>> only
> > >>>> be temporary?
> > >>>>
> > >>>> I personally do not have much experience in this area of Flink, so I
> > am
> > >>>> looking forward to hearing your thoughts on this.
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> Konstantin
> > >>>>
> > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824
> > >>>>
> > >>>> --
> > >>>>
> > >>>> Konstantin Knauf
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Benchao Li
> > >>> School of Electronics Engineering and Computer Science, Peking
> > University
> > >>> Tel:+86-15650713730
> > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to