I might missed something but why we need a new "TEMPORAL TABLE" syntax?

According to Fabian's first mail:

> Hence, the requirements for a temporal table are:
> * The temporal table has a primary key / unique attribute
> * The temporal table has a time-attribute that defines the start of the
> validity interval of a row (processing time or event time)
> * The system knows that the history of the table is tracked and can infer
> how to look up a version.

I think primary key plus proper event time attribute is already sufficient.
So a join query looks like:

"Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id =
Dim.id"

would means for every record belong to Fact, use Fact.some_event_time as
Dim's version (which
will only keep all records from Dim table with event time less or equal
to Fact.some_event_time, and
keep only one record for each primary key).

The temporal behavior is actually triggered by the join syntax "FOR
SYSTEM_TIME AS OF Fact.some_event_time"
but not the DDL description.

Best,
Kurt


On Fri, May 8, 2020 at 10:51 AM Jark Wu <imj...@gmail.com> wrote:

> Hi,
>
> I agree what Fabian said above.
> Besides, IMO, (3) is in a lower priority and will involve much more things.
> It makes sense to me to do it in two-phase.
>
> Regarding to (3), the key point to convert an append-only table into
> changelog table is that the framework should know the operation type,
> so we introduced a special CREATE VIEW syntax to do it in the documentation
> [1]. Here is an example:
>
> -- my_binlog table is registered as an append-only table
> CREATE TABLE my_binlog (
>   before ROW<...>,
>   after ROW<...>,
>   op STRING,
>   op_ms TIMESTAMP(3)
> ) WITH (
>   'connector.type' = 'kafka',
>   ...
> );
>
> -- interpret my_binlog as a changelog on the op_type and id key
> CREATE VIEW my_table AS
>   SELECT
>     after.*
>   FROM my_binlog
>   CHANGELOG OPERATION BY op
>   UPDATE KEY BY (id);
>
> -- my_table will materialize the insert/delete/update changes
> -- if we have 4 records in dbz that
> -- a create for 1004
> -- an update for 1004
> -- a create for 1005
> -- a delete for 1004
> > SELECT COUNT(*) FROM my_table;
> +-----------+
> |  COUNT(*) |
> +-----------+
> |     1     |
> +-----------+
>
> Best,
> Jark
>
> [1]:
>
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
>
>
> On Fri, 8 May 2020 at 00:24, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Thanks for the summary Konstantin.
> > I think you got all points right.
> >
> > IMO, the way forward would be to work on a FLIP to define
> > * the concept of temporal tables,
> > * how to feed them from retraction tables
> > * how to feed them from append-only tables
> > * their specification with CREATE TEMPORAL TABLE,
> > * how to use temporal tables in temporal table joins
> > * how (if at all) to use temporal tables in other types of queries
> >
> > We would keep the LATERAL TABLE syntax because it used for regular
> > table-valued functions.
> > However, we would probably remove the TemporalTableFunction (which is a
> > built-in table-valued function) after we deprecated it for a while.
> >
> > Cheers, Fabian
> >
> > Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf <
> > kna...@apache.org>:
> >
> >> Hi everyone,
> >>
> >> Thanks everyone for joining the discussion on this. Please let me
> >> summarize
> >> what I have understood so far.
> >>
> >> 1) For joining an append-only table and a temporal table the syntax the
> >> "FOR
> >> SYSTEM_TIME AS OF <time-attribute>" seems to be preferred (Fabian, Timo,
> >> Seth).
> >>
> >> 2) To define a temporal table based on a changelog stream from an
> external
> >> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be
> used.
> >> 3) In order to also support temporal tables derived from an append-only
> >> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian)
> >> or
> >> need to have a way to convert an append-only table into a changelog
> table
> >> (briefly discussed in [1]). It is not completely clear to me how a
> >> temporal
> >> table based on an append-only table would be with the syntax proposed in
> >> [1] and 2). @Jark Wu <imj...@gmail.com> could you elaborate a bit on
> >> that?
> >>
> >> How do we move forward with this?
> >>
> >> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense.
> >> What do you think? * If we proceed like this, what would this mean for
> the
> >> current syntax of LATERAL TABLE? Would we keep it? Would we eventually
> >> deprecate and drop it? Since only after 3) we would be on par with the
> >> current temporal table function join, I assume, we could only drop it
> >> thereafter.
> >>
> >> Thanks, Konstantin
> >>
> >> [1]
> >>
> >>
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6
> >>
> >>
> >> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu <imj...@gmail.com> wrote:
> >>
> >> > Hi Fabian,
> >> >
> >> > Just to clarify a little bit, we decided to move the "converting
> >> > append-only table into changelog table" into future work.
> >> > So FLIP-105 only introduced some CDC formats (debezium) and new
> >> TableSource
> >> > interfaces proposed in FLIP-95.
> >> > I should have started a new FLIP for the new CDC formats and keep
> >> FLIP-105
> >> > as it is to avoid the confusion, sorry about that.
> >> >
> >> > Best,
> >> > Jark
> >> >
> >> >
> >> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >> >
> >> > > Thanks Jark!
> >> > >
> >> > > I certainly need to read up on FLIP-105 (and I'll try to adjust my
> >> > > terminology to changelog table from now on ;-) )
> >> > > If FLIP-105 addresses the issue of converting an append-only table
> >> into a
> >> > > changelog table that upserts on primary key (basically what the VIEW
> >> > > definition in my first email did),
> >> > > TEMPORAL VIEWs become much less important.
> >> > > In that case, we would be well served with TEMPORAL TABLE and
> TEMPORAL
> >> > VIEW
> >> > > would be a nice-to-have feature for some later time.
> >> > >
> >> > > Cheers, Fabian
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu <
> imj...@gmail.com
> >> >:
> >> > >
> >> > > > Hi Fabian,
> >> > > >
> >> > > > I think converting an append-only table into temporal table
> contains
> >> > two
> >> > > > things:
> >> > > > (1) converting append-only table into changelog table (or
> retraction
> >> > > table
> >> > > > as you said)
> >> > > > (2) define the converted changelog table (maybe is a view now) as
> >> > > temporal
> >> > > > (or history tracked).
> >> > > >
> >> > > > The first thing is also mentioned and discussed in FLIP-105 design
> >> > draft
> >> > > > [1] which proposed a syntax
> >> > > > to convert the append-only table into a changelog table.
> >> > > >
> >> > > > I think TEMPORAL TABLE is quite straightforward and simple, and
> can
> >> > > satisfy
> >> > > > most existing changelog
> >> > > > data with popular CDC formats. TEMPORAL VIEW is flexible but will
> >> > involve
> >> > > > more SQL codes. I think
> >> > > > we can support them both.
> >> > > >
> >> > > > Best,
> >> > > > Jark
> >> > > >
> >> > > > [1]:
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
> >> > > >
> >> > > > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com>
> >> wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > >
> >> > > > > I agree with most of what Timo said.
> >> > > > >
> >> > > > > The TEMPORAL keyword (which unfortunately might be easily
> confused
> >> > with
> >> > > > > TEMPORARY...) looks very intuitive and I think using the only
> time
> >> > > > > attribute for versioning would be a good choice.
> >> > > > >
> >> > > > > However, TEMPORAL TABLE on retraction tables do not solve the
> full
> >> > > > problem.
> >> > > > > I believe there will be also cases where we need to derive a
> >> temporal
> >> > > > table
> >> > > > > from an append only table (what TemporalTableFunctions do right
> >> now).
> >> > > > > I think the best choice for this would be TEMPORAL VIEW but as I
> >> > > > explained,
> >> > > > > it might be a longer way until this can be supported.
> >> > > > > TEMPORAL VIEW would also address the problem of preprocessing.
> >> > > > >
> >> > > > > > Regarding retraction table with a primary key and a
> >> time-attribute:
> >> > > > > > These semantics are still unclear to me. Can retractions only
> >> occur
> >> > > > > > within watermarks? Or are they also used for representing late
> >> > > updates?
> >> > > > >
> >> > > > > Time attributes and retraction streams are a challenging topic
> >> that I
> >> > > > > haven't completely understood yet.
> >> > > > > So far we treated time attributes always as part of the data.
> >> > > > > In combination with retractions, it seems that they become
> >> metadata
> >> > > that
> >> > > > > specifies when a change was done.
> >> > > > > I think this is different from treating time attributes as
> regular
> >> > > data.
> >> > > > >
> >> > > > > Cheers, Fabian
> >> > > > >
> >> > > > >
> >> > > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman <
> >> > > > > sjwies...@gmail.com
> >> > > > > >:
> >> > > > >
> >> > > > > > I really like the TEMPORAL keyword, I find it very intuitive.
> >> > > > > >
> >> > > > > > The down side of this approach would be that an additional
> >> > > > preprocessing
> >> > > > > > > step would not be possible anymore because there is no
> >> preceding
> >> > > > view.
> >> > > > > > >
> >> > > > > >
> >> > > > > >  Yes and no. My understanding is we are not talking about
> making
> >> > any
> >> > > > > > changes to how temporal tables are defined in the table api.
> >> Since
> >> > > you
> >> > > > > > cannot currently define temporal table functions in pure SQL
> >> > > > > applications,
> >> > > > > > but only pre-register them in YAML, you can't do any
> >> pre-processing
> >> > > as
> >> > > > it
> >> > > > > > stands today. Preprocessing may be a generally useful feature,
> >> I'm
> >> > > not
> >> > > > > > sure, but this syntax does not lose us anything in pure SQL
> >> > > > applications.
> >> > > > > >
> >> > > > > > These semantics are still unclear to me. Can retractions only
> >> occur
> >> > > > > > > within watermarks? Or are they also used for representing
> late
> >> > > > updates?
> >> > > > > > >
> >> > > > > >
> >> > > > > > I do not know the SQL standard well enough to give a
> principled
> >> > > > response
> >> > > > > to
> >> > > > > > this question. However, in my observation of production
> >> workloads,
> >> > > > users
> >> > > > > of
> >> > > > > > temporal table functions are doing so to denormalize star
> >> schemas
> >> > > > before
> >> > > > > > performing further transformations and aggregations and expect
> >> the
> >> > > > output
> >> > > > > > to be an append stream. With the ongoing work to better
> support
> >> > > > > changelogs,
> >> > > > > > the need for users to understand the differences in append vs
> >> > upsert
> >> > > in
> >> > > > > > their query may be diminishing but everyone else on this
> thread
> >> can
> >> > > > > better
> >> > > > > > speak to that.
> >> > > > > >
> >> > > > > > Seth
> >> > > > > >
> >> > > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther <
> >> twal...@apache.org>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi Fabian,
> >> > > > > > >
> >> > > > > > > thank you very much for this great summary!
> >> > > > > > >
> >> > > > > > > I wasn't aware of the Polymorphic Table Functions standard.
> >> This
> >> > > is a
> >> > > > > > > very interesting topic that we should definitely consider in
> >> the
> >> > > > > future.
> >> > > > > > > Maybe this could also help us in defining tables more
> >> dynamically
> >> > > > > within
> >> > > > > > > a query. It could help solving problems as discussed in
> >> FLIP-113.
> >> > > > > > >
> >> > > > > > > Regarding joining:
> >> > > > > > >
> >> > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of
> the
> >> > > > current
> >> > > > > > > `LATERAL TABLE(rates(x))` syntax. A function that also
> behaves
> >> > > like a
> >> > > > > > > table and needs this special `LATERAL` keyword during
> joining
> >> is
> >> > > not
> >> > > > > > > very intuitive. The PTF could be used once they are fully
> >> > supported
> >> > > > by
> >> > > > > > > Calcite and we have the big picture how to also use them for
> >> > other
> >> > > > > > > time-based operations (windows?, joins?).
> >> > > > > > >
> >> > > > > > > Regarding how represent a temporal table:
> >> > > > > > >
> >> > > > > > > I think that our current DDL, current LookupTableSource and
> >> > > temporal
> >> > > > > > > tables can fit nicely together.
> >> > > > > > >
> >> > > > > > > How about we simply introduce an additional keyword
> >> `TEMPORAL` to
> >> > > > > > > indicate history tracking semantics? I think this is the
> >> minimal
> >> > > > > > > invasive solution:
> >> > > > > > >
> >> > > > > > > CREATE TEMPORAL TABLE rates (
> >> > > > > > >    currency CHAR(3) NOT NULL PRIMARY KEY,
> >> > > > > > >    rate DOUBLE,
> >> > > > > > >    rowtime TIMESTAMP,
> >> > > > > > >    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> >> > > > > > > WITH (...);
> >> > > > > > >
> >> > > > > > > - The primary key would be defined by the DDL.
> >> > > > > > > - The available time attribute would be defined by the DDL.
> >> > Either
> >> > > as
> >> > > > > > > the only time attribute of the table or we introduce a
> special
> >> > > > > > > constraint similar to `PRIMARY KEY`.
> >> > > > > > >
> >> > > > > > > The down side of this approach would be that an additional
> >> > > > > preprocessing
> >> > > > > > > step would not be possible anymore because there is no
> >> preceding
> >> > > > view.
> >> > > > > > >
> >> > > > > > > The `TEMPORAL` semantic can be stored in the properties of
> the
> >> > > table
> >> > > > > > > when writing to a catalog. We do the same for watermarks and
> >> > > computed
> >> > > > > > > columns.
> >> > > > > > >
> >> > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x`
> >> would
> >> > > only
> >> > > > > > > work on processing time by a lookup into the external system
> >> or
> >> > on
> >> > > > > > > event-time by using the time semantics that the external
> >> system
> >> > > > > supports.
> >> > > > > > >
> >> > > > > > > Regarding retraction table with a primary key and a
> >> > time-attribute:
> >> > > > > > >
> >> > > > > > > These semantics are still unclear to me. Can retractions
> only
> >> > occur
> >> > > > > > > within watermarks? Or are they also used for representing
> late
> >> > > > updates?
> >> > > > > > >
> >> > > > > > > Regards,
> >> > > > > > > Timo
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On 17.04.20 14:34, Fabian Hueske wrote:
> >> > > > > > > > Hi all,
> >> > > > > > > >
> >> > > > > > > > First of all, I appologize for the text wall that's
> >> > following...
> >> > > > ;-)
> >> > > > > > > >
> >> > > > > > > > A temporal table join joins an append-only table and a
> >> temporal
> >> > > > > table.
> >> > > > > > > > The question about how to represent a temporal table join
> >> boils
> >> > > > down
> >> > > > > to
> >> > > > > > > two
> >> > > > > > > > questions:
> >> > > > > > > >
> >> > > > > > > > 1) How to represent a temporal table
> >> > > > > > > > 2) How to specify the join of an append-only table and a
> >> > temporal
> >> > > > > table
> >> > > > > > > >
> >> > > > > > > > I'll discuss these points separately.
> >> > > > > > > >
> >> > > > > > > > # 1 How to represent a temporal table
> >> > > > > > > >
> >> > > > > > > > A temporal table is a table that can be looked up with a
> >> time
> >> > > > > parameter
> >> > > > > > > and
> >> > > > > > > > which returns the rows of the table at that point in time
> /
> >> for
> >> > > > that
> >> > > > > > > > version.
> >> > > > > > > > In order to be able to (conceptually) look up previous
> >> > versions,
> >> > > a
> >> > > > > > > temporal
> >> > > > > > > > table must be (conceptually) backed by a history table
> that
> >> > > tracks
> >> > > > > all
> >> > > > > > > > previous versions (see SqlServer docs [1]).
> >> > > > > > > > In the context of our join, we added another restriction
> >> namely
> >> > > > that
> >> > > > > > the
> >> > > > > > > > table must have a primary key, i.e., there is only one row
> >> for
> >> > > each
> >> > > > > > > version
> >> > > > > > > > for each unique key.
> >> > > > > > > >
> >> > > > > > > > Hence, the requirements for a temporal table are:
> >> > > > > > > > * The temporal table has a primary key / unique attribute
> >> > > > > > > > * The temporal table has a time-attribute that defines the
> >> > start
> >> > > of
> >> > > > > the
> >> > > > > > > > validity interval of a row (processing time or event time)
> >> > > > > > > > * The system knows that the history of the table is
> tracked
> >> and
> >> > > can
> >> > > > > > infer
> >> > > > > > > > how to look up a version.
> >> > > > > > > >
> >> > > > > > > > There are two possible types of input from which we want
> to
> >> > > create
> >> > > > > > > temporal
> >> > > > > > > > tables (that I'm aware of):
> >> > > > > > > >
> >> > > > > > > > * append-only tables, i.e., tables that contain the full
> >> change
> >> > > > > history
> >> > > > > > > > * retraction tables, i.e., tables that are updating and do
> >> not
> >> > > > > remember
> >> > > > > > > the
> >> > > > > > > > history.
> >> > > > > > > >
> >> > > > > > > > There are a few ways to do this:
> >> > > > > > > >
> >> > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a time
> >> > > > attribute.
> >> > > > > > > >
> >> > > > > > > > The following view definition results in a view that
> >> provides
> >> > the
> >> > > > > > latest
> >> > > > > > > > rate for each currency.
> >> > > > > > > >
> >> > > > > > > > CREATE VIEW rates AS
> >> > > > > > > > SELECT
> >> > > > > > > >    currency, MAX(rate) as rate, MAX(rowtime) as rowtime
> >> > > > > > > > FROM rates_history rh1
> >> > > > > > > > WHERE
> >> > > > > > > >    rh1.rowtime = (
> >> > > > > > > >      SELECT max(rowtime)
> >> > > > > > > >      FROM rates_history rh2
> >> > > > > > > >      WHERE rh2.curreny = rh1.currency)
> >> > > > > > > > GROUP BY currency
> >> > > > > > > > WITH (
> >> > > > > > > >    'historytracking' = 'true',
> >> > > > > > > >    'historytracking.starttime' = 'rowtime');
> >> > > > > > > >
> >> > > > > > > > However, we also need to tell the system to track the
> >> history
> >> > of
> >> > > > all
> >> > > > > > > > changes of the view in order to be able to look it up.
> >> > > > > > > > That's what the properties in the WITH clause are for
> >> (inspired
> >> > > by
> >> > > > > > > > SqlServer's TEMPORAL TABLE DDL syntax).
> >> > > > > > > > Note that this is *not* a syntax proposal but only meant
> to
> >> > show
> >> > > > > which
> >> > > > > > > > information is needed.
> >> > > > > > > > This view allows to look up any version of the "rates"
> view.
> >> > > > > > > >
> >> > > > > > > > In addition to designing and implementing the DDL syntax
> for
> >> > > views
> >> > > > > that
> >> > > > > > > > support temporal lookups, the optimizer would need to
> >> > understand
> >> > > > the
> >> > > > > > > > semantics of the view definition in depth.
> >> > > > > > > > Among other things it needs to understand that the MAX()
> >> > > > aggregation
> >> > > > > on
> >> > > > > > > the
> >> > > > > > > > time-attribute preserves its watermark alignment.
> >> > > > > > > > AFAIK, this is not the case at the moment (the time
> >> attribute
> >> > > would
> >> > > > > be
> >> > > > > > > > converted into a regular TIMESTAMP and lose it's time
> >> attribute
> >> > > > > > > properties)
> >> > > > > > > >
> >> > > > > > > > ## 1.2 A retraction table with a primary key and a
> >> > > time-attribute.
> >> > > > > > > >
> >> > > > > > > > On paper it looks like such a table would automatically
> >> qualify
> >> > > as
> >> > > > a
> >> > > > > > > > time-versioned table because it completely fulfills the
> >> > > > requirements.
> >> > > > > > > > However, I don't think we can use it *as is* as a temporal
> >> > table
> >> > > if
> >> > > > > we
> >> > > > > > > want
> >> > > > > > > > to have clean semantics.
> >> > > > > > > > The problem here is the "lost history" of the retraction
> >> table.
> >> > > The
> >> > > > > > > dynamic
> >> > > > > > > > table that is defined on the retraction stream only stores
> >> the
> >> > > > latest
> >> > > > > > > > version (even though it sees all versions).
> >> > > > > > > > Conceptually, a temporal table look up the version of the
> >> table
> >> > > at
> >> > > > > any
> >> > > > > > > > point in time because it is backed by a history table.
> >> > > > > > > > If this information is not available, we cannot have a
> >> > > semantically
> >> > > > > > clean
> >> > > > > > > > definition of the join IMO.
> >> > > > > > > >
> >> > > > > > > > Therefore we should define the table in a way that the
> >> system
> >> > > knows
> >> > > > > > that
> >> > > > > > > > the history is tracked.
> >> > > > > > > > In MSSQL uses a syntax similar to this one
> >> > > > > > > >
> >> > > > > > > > CREATE TABLE rates (
> >> > > > > > > >      currency CHAR(3) NOT NULL PRIMARY KEY,
> >> > > > > > > >      rate DOUBLE,
> >> > > > > > > >      rowtime TIMESTAMP,
> >> > > > > > > >      WATERMARK FOR rowtime AS rowtime - INTERVAL '5'
> MINUTE)
> >> > > > > > > > WITH (
> >> > > > > > > >    'historytracking' = 'true',
> >> > > > > > > >    'historytracking.starttime' = 'rowtime');
> >> > > > > > > >
> >> > > > > > > > The 'historytracking' properties would decare that the
> table
> >> > > tracks
> >> > > > > its
> >> > > > > > > > history and also specify the attribute (rowtime) that is
> >> used
> >> > for
> >> > > > > > > > versioning.
> >> > > > > > > >
> >> > > > > > > > ## 1.3 Registering a TableFunction that takes an
> append-only
> >> > > table
> >> > > > > with
> >> > > > > > > > time attribute
> >> > > > > > > >
> >> > > > > > > > The TableFunction requires a few parameters:
> >> > > > > > > > * the source table from which to derive the temporal table
> >> > > > > > > > * the key attribute on which the versions of the source
> >> table
> >> > > > should
> >> > > > > be
> >> > > > > > > > computed
> >> > > > > > > > * the time attribute that defines the versions
> >> > > > > > > > * a lookup timestamp for the version of that is returned.
> >> > > > > > > >
> >> > > > > > > > The reason why we chose the TableFunction approach over
> the
> >> > VIEW
> >> > > > > > approach
> >> > > > > > > > so far were:
> >> > > > > > > > * It is easier for the optimizer to identify a build-in
> >> table
> >> > > > > function
> >> > > > > > > than
> >> > > > > > > > to analyze and reason about a generic VIEW.
> >> > > > > > > > * We would need to make the optimizer a lot smarter to
> infer
> >> > all
> >> > > > the
> >> > > > > > > > properties from the generic VIEW definition that we need
> >> for a
> >> > > > > temporal
> >> > > > > > > > table join.
> >> > > > > > > > * Passing a parameter to a function is a known thing,
> >> passing a
> >> > > > > > parameter
> >> > > > > > > > to a VIEW not so much.
> >> > > > > > > > * Users would need to specify the VIEW exactly correct,
> such
> >> > that
> >> > > > it
> >> > > > > > can
> >> > > > > > > be
> >> > > > > > > > used as a temporal table. Look at 1.1 why this is not
> >> trivial.
> >> > > > > > > >
> >> > > > > > > > There is two ways to use a TableFunction:
> >> > > > > > > >
> >> > > > > > > > ### 1.3.1 Built-in and pre-registered function that is
> >> > > > parameterized
> >> > > > > in
> >> > > > > > > the
> >> > > > > > > > SQL query
> >> > > > > > > >
> >> > > > > > > > Here, we do not need to do anything to register the
> >> function.
> >> > We
> >> > > > > simply
> >> > > > > > > use
> >> > > > > > > > it in the query (see example in 2.2 below)
> >> > > > > > > >
> >> > > > > > > > ### 1.3.2 Parameterize function when it is registered in
> the
> >> > > > catalog
> >> > > > > > > (with
> >> > > > > > > > a provided Java implementation)
> >> > > > > > > >
> >> > > > > > > > This is the approach, we've used so far. In the Table API,
> >> the
> >> > > > > function
> >> > > > > > > is
> >> > > > > > > > first parameterized and created and then registered:
> >> > > > > > > > We would need a DDL syntax to parameterize UDFs on
> >> > registration.
> >> > > > > > > > I don't want to propose a syntax here, but just to get an
> >> idea
> >> > it
> >> > > > > might
> >> > > > > > > > look like this:
> >> > > > > > > >
> >> > > > > > > > CREATE FUNCTION rates AS
> >> > > > > > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH
> >> > > ('table' =
> >> > > > > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
> >> > > > > > > >
> >> > > > > > > > Right now, the Flink Catalog interface does not have the
> >> > > > > functionality
> >> > > > > > to
> >> > > > > > > > store such parameters and would need some hacks to
> properly
> >> > > create
> >> > > > > > > properly
> >> > > > > > > > parameterize function instances.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > # 2 Defining a join of an append-only table and a temporal
> >> > table
> >> > > > > > > >
> >> > > > > > > > The append-only table needs to have a time-attribute
> >> > (processing
> >> > > > time
> >> > > > > > or
> >> > > > > > > > event time, but same as the temporal table).
> >> > > > > > > > The join then needs to specify two things:
> >> > > > > > > > * an equality predicate that includes the primary key of
> the
> >> > > > temporal
> >> > > > > > > table
> >> > > > > > > > * declare the time attribute of the append-only table as
> the
> >> > time
> >> > > > as
> >> > > > > of
> >> > > > > > > > which to look up the temporal table, i.e, get the version
> of
> >> > the
> >> > > > > > temporal
> >> > > > > > > > table that is valid for the timestamp of the current row
> >> from
> >> > the
> >> > > > > > > > append-only table
> >> > > > > > > >
> >> > > > > > > > The tricky part (from a syntax point of view) is to
> specify
> >> the
> >> > > > > lookup
> >> > > > > > > > time.
> >> > > > > > > >
> >> > > > > > > > ## 2.1 the temporal table is a regular table or view (see
> >> > > > approaches
> >> > > > > > 1.1
> >> > > > > > > > and 1.2 above)
> >> > > > > > > >
> >> > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x"
> >> clause as
> >> > > > > > follows:
> >> > > > > > > >
> >> > > > > > > > SELECT *
> >> > > > > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> >> > > > > > > > WHERE o.currency = r.currency
> >> > > > > > > >
> >> > > > > > > > IMO, this is a great syntax and the one we should strive
> >> for.
> >> > > > > > > > We would need to bend the rules of the SQL standard which
> >> only
> >> > > > > allows x
> >> > > > > > > in
> >> > > > > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table
> on
> >> > which
> >> > > > it
> >> > > > > is
> >> > > > > > > > applied usually needs to be a specific type (not sure if
> >> views
> >> > > are
> >> > > > > > > > supported), but I guess this is fine.
> >> > > > > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported
> for
> >> > > > > > LookupTable
> >> > > > > > > > Joins if x is a processing time attribute [2].
> >> > > > > > > >
> >> > > > > > > > ## 2.2 the temporal table is a TableFunction and
> >> parameterized
> >> > in
> >> > > > the
> >> > > > > > > query
> >> > > > > > > > (see 1.3.1 above)
> >> > > > > > > >
> >> > > > > > > > SELECT *
> >> > > > > > > > FROM orders o,
> >> > > > > > > >    TEMPORAL_TABLE(
> >> > > > > > > >      table => TABLE(rates_history),
> >> > > > > > > >      key => DESCRIPTOR(currency),
> >> > > > > > > >      time => DESCRIPTOR(rowtime)) r
> >> > > > > > > >    ON o.currency = r.currency
> >> > > > > > > >
> >> > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing was
> >> > > > registered
> >> > > > > in
> >> > > > > > > the
> >> > > > > > > > catalog (except the rates_history table).
> >> > > > > > > > In fact this is valid SQL:2016 syntax and called
> Polymorphic
> >> > > Table
> >> > > > > > > > Functions. Have a look here [3].
> >> > > > > > > >
> >> > > > > > > > ## 2.3 the temporal table is a TableFunction that was
> >> > > parameterized
> >> > > > > > > during
> >> > > > > > > > registration (see 1.3.2 above)
> >> > > > > > > >
> >> > > > > > > > This is what we have at the momement.
> >> > > > > > > >
> >> > > > > > > > SELECT *
> >> > > > > > > > FROM orders o,
> >> > > > > > > >    LATERAL TABLE (rates(o.ordertime))
> >> > > > > > > >    ON o.currency = r.currency
> >> > > > > > > >
> >> > > > > > > > The TableFunction "rates" was registered in the catalog
> and
> >> > > > > > parameterized
> >> > > > > > > > to the "rates_history" append-only table, the key was set
> to
> >> > > > > > "currency",
> >> > > > > > > > and the time attribute was declared.
> >> > > > > > > >
> >> > > > > > > > # SUMMARY
> >> > > > > > > >
> >> > > > > > > > IMO we should in the long run aim to define temporal
> tables
> >> > > either
> >> > > > as
> >> > > > > > > > upsert retraction tables and views on append-only tables
> and
> >> > join
> >> > > > > them
> >> > > > > > > > using the "FOR SYSTEM_TIME AS OF x" syntax.
> >> > > > > > > > I guess it is debatable whether we need to decare to track
> >> > > history
> >> > > > > for
> >> > > > > > > > these tables (which we don't actually do) or if we do it
> by
> >> > > > > convention
> >> > > > > > if
> >> > > > > > > > the table has a time attribute.
> >> > > > > > > > It should be (relatively) easy to get this to work for
> >> > retraction
> >> > > > > > tables
> >> > > > > > > > which will be supported soon.
> >> > > > > > > > It will be more work for views because we need to improve
> >> the
> >> > > time
> >> > > > > > > > attribute handling with MAX() aggregations.
> >> > > > > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for
> >> > > > > > LookupTableSources
> >> > > > > > > > and would "only" need to be adapted to work on temporal
> >> tables.
> >> > > > > > > >
> >> > > > > > > > Registering parameterized TableFunctions in the catalog
> >> seems
> >> > > like
> >> > > > > > quite
> >> > > > > > > a
> >> > > > > > > > bit of work. We need new DDL syntax, extend the catalog
> and
> >> > > > function
> >> > > > > > > > instantiation. This won't be easy, IMO.
> >> > > > > > > > If we only support them as TEMPORARY FUNCTION which are
> not
> >> > > > > registered
> >> > > > > > in
> >> > > > > > > > the catalog it will be easier. The question is whether it
> is
> >> > > worth
> >> > > > > the
> >> > > > > > > > effort if we decide for the other approach.
> >> > > > > > > >
> >> > > > > > > > Using TableFunctions that are parameterized in the query
> >> will
> >> > > > require
> >> > > > > > to
> >> > > > > > > > extend the Calcite parser and framework to support
> >> Polymorphic
> >> > > > Table
> >> > > > > > > > Functions.
> >> > > > > > > > However, there might already some work be done there,
> >> because
> >> > > AFAIK
> >> > > > > > > Apache
> >> > > > > > > > Beam aims to support this syntax for windowing functions
> as
> >> > > > described
> >> > > > > > in
> >> > > > > > > > the "One SQL to rule them all" paper [4].
> >> > > > > > > > It might be the fastest and fully SQL standard compliant
> >> way.
> >> > > > > > > >
> >> > > > > > > > Cheers,
> >> > > > > > > > Fabian
> >> > > > > > > >
> >> > > > > > > > [1]
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables
> >> > > > > > > > [2]
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
> >> > > > > > > > [3]
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip
> >> > > > > > > > [4] https://arxiv.org/abs/1905.12133
> >> > > > > > > >
> >> > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <
> >> > > > imj...@gmail.com
> >> > > > > >:
> >> > > > > > > >
> >> > > > > > > >> Hi Konstantin,
> >> > > > > > > >>
> >> > > > > > > >> Thanks for bringing this discussion. I think temporal
> join
> >> is
> >> > a
> >> > > > very
> >> > > > > > > >> important feature and should be exposed to pure SQL
> users.
> >> > > > > > > >> And I already received many requirements like this.
> >> > > > > > > >> However, my concern is that how to properly support this
> >> > feature
> >> > > > in
> >> > > > > > SQL.
> >> > > > > > > >> Introducing a DDL syntax for Temporal Table Function is
> one
> >> > way,
> >> > > > but
> >> > > > > > > maybe
> >> > > > > > > >> not the best one.
> >> > > > > > > >>
> >> > > > > > > >> The most important reason is that the underlying of
> >> temporal
> >> > > table
> >> > > > > > > function
> >> > > > > > > >> is exactly a changelog stream.
> >> > > > > > > >> The temporal join is actually temporal joining a fact
> >> stream
> >> > > with
> >> > > > > the
> >> > > > > > > >> changelog stream on processing time or event time.
> >> > > > > > > >> We will soon support to create a changelog source using
> DDL
> >> > once
> >> > > > > > FLIP-95
> >> > > > > > > >> and FLIP-105 is finished.
> >> > > > > > > >> At that time, we can have a simple DDL to create
> changelog
> >> > > source
> >> > > > > like
> >> > > > > > > >> this;
> >> > > > > > > >>
> >> > > > > > > >> CREATE TABLE rate_changelog (
> >> > > > > > > >>    currency STRING,
> >> > > > > > > >>    rate DECIMAL
> >> > > > > > > >> ) WITH (
> >> > > > > > > >>    'connector' = 'kafka',
> >> > > > > > > >>    'topic' = 'rate_binlog',
> >> > > > > > > >>    'properties.bootstrap.servers' = 'localhost:9092',
> >> > > > > > > >>    'format' = 'debezium-json'
> >> > > > > > > >> );
> >> > > > > > > >>
> >> > > > > > > >> In the meanwhile, we already have a SQL standard temporal
> >> join
> >> > > > > syntax
> >> > > > > > > [1],
> >> > > > > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
> >> > > > > > > >> It is currently used as dimension table lookup join, but
> >> the
> >> > > > > semantic
> >> > > > > > is
> >> > > > > > > >> the same to the "temporal table function join"[2].
> >> > > > > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is
> more
> >> > > nature
> >> > > > > > > >> becuase the definition of B is a *table* not a *table
> >> > function*,
> >> > > > > > > >> and the syntax is included in SQL standard.
> >> > > > > > > >>
> >> > > > > > > >> So once we have the ability to define "rate_changelog"
> >> table,
> >> > > then
> >> > > > > we
> >> > > > > > > can
> >> > > > > > > >> use the following query to temporal join the changelog on
> >> > > > processing
> >> > > > > > > time.
> >> > > > > > > >>
> >> > > > > > > >> SELECT *
> >> > > > > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF
> >> > > > > orders.proctime
> >> > > > > > > >> ON orders.currency = rate_changelog.currency;
> >> > > > > > > >>
> >> > > > > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can
> >> > easily
> >> > > > to
> >> > > > > > > support
> >> > > > > > > >> "temporal join on changelogs" without introducing new
> >> syntax.
> >> > > > > > > >> IMO, introducing a DDL syntax for Temporal Table Function
> >> > looks
> >> > > > like
> >> > > > > > > not an
> >> > > > > > > >> easy way and may have repetitive work.
> >> > > > > > > >>
> >> > > > > > > >> Best,
> >> > > > > > > >> Jark
> >> > > > > > > >>
> >> > > > > > > >> [1]:
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> >> > > > > > > >> [2]:
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li <
> >> libenc...@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > > > >>
> >> > > > > > > >>> Hi Konstantin,
> >> > > > > > > >>>
> >> > > > > > > >>> Thanks for bringing up this discussion. +1 for the idea.
> >> > > > > > > >>> We have met this in our company too, and I planned to
> >> support
> >> > > it
> >> > > > > > > recently
> >> > > > > > > >>> in our internal branch.
> >> > > > > > > >>>
> >> > > > > > > >>> regarding to your questions,
> >> > > > > > > >>> 1) I think it might be more a table/view than function,
> >> just
> >> > > like
> >> > > > > > > >> Temporal
> >> > > > > > > >>> Table (which is also known as
> >> > > > > > > >>> dimension table). Maybe we need a DDL like CREATE VIEW
> and
> >> > plus
> >> > > > > some
> >> > > > > > > >>> additional settings.
> >> > > > > > > >>> 2) If we design the DDL for it like view, then maybe
> >> > temporary
> >> > > is
> >> > > > > ok
> >> > > > > > > >>> enough.
> >> > > > > > > >>>
> >> > > > > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四
> >> 下午8:16写道:
> >> > > > > > > >>>
> >> > > > > > > >>>> Hi everyone,
> >> > > > > > > >>>>
> >> > > > > > > >>>> it would be very useful if temporal tables could be
> >> created
> >> > > via
> >> > > > > > DDL.
> >> > > > > > > >>>> Currently, users either need to do this in the Table
> API
> >> or
> >> > in
> >> > > > the
> >> > > > > > > >>>> environment file of the Flink CLI, which both require
> the
> >> > user
> >> > > > to
> >> > > > > > > >> switch
> >> > > > > > > >>>> the context of the SQL CLI/Editor. I recently created a
> >> > ticket
> >> > > > for
> >> > > > > > > this
> >> > > > > > > >>>> request [1].
> >> > > > > > > >>>>
> >> > > > > > > >>>> I see two main questions:
> >> > > > > > > >>>>
> >> > > > > > > >>>> 1) What would be the DDL syntax? A Temporal Table is on
> >> the
> >> > > one
> >> > > > > > hand a
> >> > > > > > > >>> view
> >> > > > > > > >>>> and on the other a function depending on how you look
> at
> >> it.
> >> > > > > > > >>>>
> >> > > > > > > >>>> 2) Would this temporal table view/function be stored in
> >> the
> >> > > > > catalog
> >> > > > > > or
> >> > > > > > > >>> only
> >> > > > > > > >>>> be temporary?
> >> > > > > > > >>>>
> >> > > > > > > >>>> I personally do not have much experience in this area
> of
> >> > > Flink,
> >> > > > > so I
> >> > > > > > > am
> >> > > > > > > >>>> looking forward to hearing your thoughts on this.
> >> > > > > > > >>>>
> >> > > > > > > >>>> Best,
> >> > > > > > > >>>>
> >> > > > > > > >>>> Konstantin
> >> > > > > > > >>>>
> >> > > > > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824
> >> > > > > > > >>>>
> >> > > > > > > >>>> --
> >> > > > > > > >>>>
> >> > > > > > > >>>> Konstantin Knauf
> >> > > > > > > >>>>
> >> > > > > > > >>>
> >> > > > > > > >>>
> >> > > > > > > >>> --
> >> > > > > > > >>>
> >> > > > > > > >>> Benchao Li
> >> > > > > > > >>> School of Electronics Engineering and Computer Science,
> >> > Peking
> >> > > > > > > University
> >> > > > > > > >>> Tel:+86-15650713730
> >> > > > > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >> > > > > > > >>>
> >> > > > > > > >>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
> >> --
> >>
> >> Konstantin Knauf
> >>
> >> https://twitter.com/snntrable
> >>
> >> https://github.com/knaufk
> >>
> >
>

Reply via email to