I think Flink should behave similar to other DBMSs.

Other DBMS do not allow to query the history of a table, even though the
DBMS has seen all changes of the table (as transactions or directly as a
changelog if the table was replicated) and recorded them in its log.
You need to declare a table as TEMPORAL to be able to look up previous
versions.

Flink is in a very similar situation.
Even though we've see the physical data of all changes and could also store
it for some time, I think we should only allow queries against previous
versions of a table (with FOR SYSTEM_TIME AS OF) if the table was defined
as TEMPORAL.

IMO this is not about having the data to return a previous version of a
table (other DBMS have the data as well), it's whether the user should tell
the system to allow access to the table's history or not.
As I said before, we could of course declare that all tables are
automatically temporal and versioned on the only event-time attribute (what
if there would be more than one?), but I personally don't like such
implicit conventions.
I don't have a concrete proposal for a syntax to declare the version
attribute of a table, but I agree that the "PERIOD FOR SYSTEM_TIME" syntax
doesn't look very suitable for our purposes.
I'm sure we can come up with a better syntax for this.

Best, Fabian

Am Sa., 9. Mai 2020 um 03:57 Uhr schrieb Kurt Young <ykt...@gmail.com>:

> All tables being described by Flink's DDL are dynamic tables. But dynamic
> table is more like a logical concept, but not physical things.
> Physically, dynamic table has two different forms, one is a materialized
> table which changes over time (e.g. Database table, HBase table),
> another form is stream which represents change logs, and they are
> typically stored in message queue (e.g, Kafka). For the later one, I think
> the records already representing the history of the dynamic table based on
> stream-table duality.
>
> So regarding to:
> > Of course we could define that Flink implicitly tracks the (recent,
> i.e., within watermark bounds) history of all dynamic tables.
> I don't think this is Flink implicitly tracking the history of the dynamic
> table, but the physical data of the table is already the history itself.
> What Flink
> did is read the history out, and organize them to be prepared for further
> operations.
>
> I agree with another implicit convention I took though, which treats the
> event time as the version of the dynamic table. Strictly speaking,
> we should use another syntax "PERIOD FOR SYSTEM_TIME" [1] to indicate the
> version of the table. I've been thinking about this for quite a bit,
> it turns out that this semantic is too similar with Flink's event time. It
> will cause more trouble for users to understand what does this mean if
> we treat event time and this "PERIOD FOR SYSTEM_TIME" differently. And I'm
> also afraid that we will introduce lots of bugs because not all
> the developers will understand this easily.
> [1]
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
>
> Best,
> Kurt
>
>
> On Sat, May 9, 2020 at 5:32 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> I think we need the TEMPORAL TABLE syntax because they are conceptually
>> more than just regular tables.
>> In a addition to being a table that always holds the latest values (and
>> can thereby serve as input to a continuous query), the system also needs to
>> track the history of such a table to be able to serve different versions of
>> the table (as requested by FOR SYSTEM_TIME AS OF).
>>
>> Of course we could define that Flink implicitly tracks the (recent, i.e.,
>> within watermark bounds) history of all dynamic tables.
>> However, there's one more thing the system needs to know to be able to
>> correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to
>> use as version of the temporal table.
>> IMO it would be good to make this explicit, especially if there is a plan
>> to eventually support support multiple event-time attributes / watermarks
>> on a table.
>> Just using the only event time attribute would be a bit too much
>> convention magic for my taste (others might of course have a different
>> opinion on this subject).
>>
>> So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE
>> statement if we agree on a few implicit conventions (implicit history table
>> + implicit versioning attribute).
>> I'm not a big fan of such conventions and think it's better to make such
>> things explicit.
>>
>> For temporal joins with processing time semantics, we can use regular
>> dynamic tables without declaring them as TEMPORAL since we don't need a
>> history table to derive the current version.
>> AFAIK, these are already the semantics we use for LookupTableSource.
>>
>> Regarding the question of append-only tables and temporal tables, I'd
>> like to share some more thoughts.
>> As I said above, a temporal table consists of a regular dynamic table A
>> that holds the latest version and a table H that holds the history of A.
>> 1) When defining a temporal table based on a regular dynamic table (with
>> a primary key), we provide A and the Flink automatically maintains H
>> (bounded by watermarks)
>> 2) When defining a temporal table based on an append-only table, Flink
>> ingests H and we use the temporal table function to turn it into a dynamic
>> table with a primary key, i.e., into A. This conversion could also be done
>> during ingestion by treating the append-only stream as an upsert changelog
>> and converting it into a dynamic table with PK and as Table A (just in case
>> 1).
>>
>> As Jark said "converting append-only table into changelog table" was
>> moved to future work.
>> Until then, we could only define TEMPORAL TABLE on a table that is
>> derived from a proper changelog stream with a specific encoding.
>> The TEMPORAL VIEW would be a shortcut which would allow us to perform the
>> conversion in Flink SQL (and not within the connector) and defining the
>> temporal properties on the result of the view.
>>
>> Cheers,
>> Fabian
>>
>>
>>
>> Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young <ykt...@gmail.com>:
>>
>>> I might missed something but why we need a new "TEMPORAL TABLE" syntax?
>>>
>>> According to Fabian's first mail:
>>>
>>> > Hence, the requirements for a temporal table are:
>>> > * The temporal table has a primary key / unique attribute
>>> > * The temporal table has a time-attribute that defines the start of the
>>> > validity interval of a row (processing time or event time)
>>> > * The system knows that the history of the table is tracked and can
>>> infer
>>> > how to look up a version.
>>>
>>> I think primary key plus proper event time attribute is already
>>> sufficient. So a join query looks like:
>>>
>>> "Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id =
>>> Dim.id"
>>>
>>> would means for every record belong to Fact, use Fact.some_event_time as
>>> Dim's version (which
>>> will only keep all records from Dim table with event time less or equal
>>> to Fact.some_event_time, and
>>> keep only one record for each primary key).
>>>
>>> The temporal behavior is actually triggered by the join syntax "FOR
>>> SYSTEM_TIME AS OF Fact.some_event_time"
>>> but not the DDL description.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, May 8, 2020 at 10:51 AM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I agree what Fabian said above.
>>>> Besides, IMO, (3) is in a lower priority and will involve much more
>>>> things.
>>>> It makes sense to me to do it in two-phase.
>>>>
>>>> Regarding to (3), the key point to convert an append-only table into
>>>> changelog table is that the framework should know the operation type,
>>>> so we introduced a special CREATE VIEW syntax to do it in the
>>>> documentation
>>>> [1]. Here is an example:
>>>>
>>>> -- my_binlog table is registered as an append-only table
>>>> CREATE TABLE my_binlog (
>>>>   before ROW<...>,
>>>>   after ROW<...>,
>>>>   op STRING,
>>>>   op_ms TIMESTAMP(3)
>>>> ) WITH (
>>>>   'connector.type' = 'kafka',
>>>>   ...
>>>> );
>>>>
>>>> -- interpret my_binlog as a changelog on the op_type and id key
>>>> CREATE VIEW my_table AS
>>>>   SELECT
>>>>     after.*
>>>>   FROM my_binlog
>>>>   CHANGELOG OPERATION BY op
>>>>   UPDATE KEY BY (id);
>>>>
>>>> -- my_table will materialize the insert/delete/update changes
>>>> -- if we have 4 records in dbz that
>>>> -- a create for 1004
>>>> -- an update for 1004
>>>> -- a create for 1005
>>>> -- a delete for 1004
>>>> > SELECT COUNT(*) FROM my_table;
>>>> +-----------+
>>>> |  COUNT(*) |
>>>> +-----------+
>>>> |     1     |
>>>> +-----------+
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>>
>>>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
>>>>
>>>>
>>>> On Fri, 8 May 2020 at 00:24, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>> > Thanks for the summary Konstantin.
>>>> > I think you got all points right.
>>>> >
>>>> > IMO, the way forward would be to work on a FLIP to define
>>>> > * the concept of temporal tables,
>>>> > * how to feed them from retraction tables
>>>> > * how to feed them from append-only tables
>>>> > * their specification with CREATE TEMPORAL TABLE,
>>>> > * how to use temporal tables in temporal table joins
>>>> > * how (if at all) to use temporal tables in other types of queries
>>>> >
>>>> > We would keep the LATERAL TABLE syntax because it used for regular
>>>> > table-valued functions.
>>>> > However, we would probably remove the TemporalTableFunction (which is
>>>> a
>>>> > built-in table-valued function) after we deprecated it for a while.
>>>> >
>>>> > Cheers, Fabian
>>>> >
>>>> > Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf <
>>>> > kna...@apache.org>:
>>>> >
>>>> >> Hi everyone,
>>>> >>
>>>> >> Thanks everyone for joining the discussion on this. Please let me
>>>> >> summarize
>>>> >> what I have understood so far.
>>>> >>
>>>> >> 1) For joining an append-only table and a temporal table the syntax
>>>> the
>>>> >> "FOR
>>>> >> SYSTEM_TIME AS OF <time-attribute>" seems to be preferred (Fabian,
>>>> Timo,
>>>> >> Seth).
>>>> >>
>>>> >> 2) To define a temporal table based on a changelog stream from an
>>>> external
>>>> >> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be
>>>> used.
>>>> >> 3) In order to also support temporal tables derived from an
>>>> append-only
>>>> >> stream, we either need to support TEMPORAL VIEW (as mentioned by
>>>> Fabian)
>>>> >> or
>>>> >> need to have a way to convert an append-only table into a changelog
>>>> table
>>>> >> (briefly discussed in [1]). It is not completely clear to me how a
>>>> >> temporal
>>>> >> table based on an append-only table would be with the syntax
>>>> proposed in
>>>> >> [1] and 2). @Jark Wu <imj...@gmail.com> could you elaborate a bit on
>>>> >> that?
>>>> >>
>>>> >> How do we move forward with this?
>>>> >>
>>>> >> * It seems that a two-phased approach (1 + 2 now, 3 later) makes
>>>> sense.
>>>> >> What do you think? * If we proceed like this, what would this mean
>>>> for the
>>>> >> current syntax of LATERAL TABLE? Would we keep it? Would we
>>>> eventually
>>>> >> deprecate and drop it? Since only after 3) we would be on par with
>>>> the
>>>> >> current temporal table function join, I assume, we could only drop it
>>>> >> thereafter.
>>>> >>
>>>> >> Thanks, Konstantin
>>>> >>
>>>> >> [1]
>>>> >>
>>>> >>
>>>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6
>>>> >>
>>>> >>
>>>> >> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu <imj...@gmail.com> wrote:
>>>> >>
>>>> >> > Hi Fabian,
>>>> >> >
>>>> >> > Just to clarify a little bit, we decided to move the "converting
>>>> >> > append-only table into changelog table" into future work.
>>>> >> > So FLIP-105 only introduced some CDC formats (debezium) and new
>>>> >> TableSource
>>>> >> > interfaces proposed in FLIP-95.
>>>> >> > I should have started a new FLIP for the new CDC formats and keep
>>>> >> FLIP-105
>>>> >> > as it is to avoid the confusion, sorry about that.
>>>> >> >
>>>> >> > Best,
>>>> >> > Jark
>>>> >> >
>>>> >> >
>>>> >> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > > Thanks Jark!
>>>> >> > >
>>>> >> > > I certainly need to read up on FLIP-105 (and I'll try to adjust
>>>> my
>>>> >> > > terminology to changelog table from now on ;-) )
>>>> >> > > If FLIP-105 addresses the issue of converting an append-only
>>>> table
>>>> >> into a
>>>> >> > > changelog table that upserts on primary key (basically what the
>>>> VIEW
>>>> >> > > definition in my first email did),
>>>> >> > > TEMPORAL VIEWs become much less important.
>>>> >> > > In that case, we would be well served with TEMPORAL TABLE and
>>>> TEMPORAL
>>>> >> > VIEW
>>>> >> > > would be a nice-to-have feature for some later time.
>>>> >> > >
>>>> >> > > Cheers, Fabian
>>>> >> > >
>>>> >> > >
>>>> >> > >
>>>> >> > >
>>>> >> > >
>>>> >> > >
>>>> >> > > Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu <
>>>> imj...@gmail.com
>>>> >> >:
>>>> >> > >
>>>> >> > > > Hi Fabian,
>>>> >> > > >
>>>> >> > > > I think converting an append-only table into temporal table
>>>> contains
>>>> >> > two
>>>> >> > > > things:
>>>> >> > > > (1) converting append-only table into changelog table (or
>>>> retraction
>>>> >> > > table
>>>> >> > > > as you said)
>>>> >> > > > (2) define the converted changelog table (maybe is a view now)
>>>> as
>>>> >> > > temporal
>>>> >> > > > (or history tracked).
>>>> >> > > >
>>>> >> > > > The first thing is also mentioned and discussed in FLIP-105
>>>> design
>>>> >> > draft
>>>> >> > > > [1] which proposed a syntax
>>>> >> > > > to convert the append-only table into a changelog table.
>>>> >> > > >
>>>> >> > > > I think TEMPORAL TABLE is quite straightforward and simple,
>>>> and can
>>>> >> > > satisfy
>>>> >> > > > most existing changelog
>>>> >> > > > data with popular CDC formats. TEMPORAL VIEW is flexible but
>>>> will
>>>> >> > involve
>>>> >> > > > more SQL codes. I think
>>>> >> > > > we can support them both.
>>>> >> > > >
>>>> >> > > > Best,
>>>> >> > > > Jark
>>>> >> > > >
>>>> >> > > > [1]:
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
>>>> >> > > >
>>>> >> > > > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com
>>>> >
>>>> >> wrote:
>>>> >> > > >
>>>> >> > > > > Hi,
>>>> >> > > > >
>>>> >> > > > > I agree with most of what Timo said.
>>>> >> > > > >
>>>> >> > > > > The TEMPORAL keyword (which unfortunately might be easily
>>>> confused
>>>> >> > with
>>>> >> > > > > TEMPORARY...) looks very intuitive and I think using the
>>>> only time
>>>> >> > > > > attribute for versioning would be a good choice.
>>>> >> > > > >
>>>> >> > > > > However, TEMPORAL TABLE on retraction tables do not solve
>>>> the full
>>>> >> > > > problem.
>>>> >> > > > > I believe there will be also cases where we need to derive a
>>>> >> temporal
>>>> >> > > > table
>>>> >> > > > > from an append only table (what TemporalTableFunctions do
>>>> right
>>>> >> now).
>>>> >> > > > > I think the best choice for this would be TEMPORAL VIEW but
>>>> as I
>>>> >> > > > explained,
>>>> >> > > > > it might be a longer way until this can be supported.
>>>> >> > > > > TEMPORAL VIEW would also address the problem of
>>>> preprocessing.
>>>> >> > > > >
>>>> >> > > > > > Regarding retraction table with a primary key and a
>>>> >> time-attribute:
>>>> >> > > > > > These semantics are still unclear to me. Can retractions
>>>> only
>>>> >> occur
>>>> >> > > > > > within watermarks? Or are they also used for representing
>>>> late
>>>> >> > > updates?
>>>> >> > > > >
>>>> >> > > > > Time attributes and retraction streams are a challenging
>>>> topic
>>>> >> that I
>>>> >> > > > > haven't completely understood yet.
>>>> >> > > > > So far we treated time attributes always as part of the data.
>>>> >> > > > > In combination with retractions, it seems that they become
>>>> >> metadata
>>>> >> > > that
>>>> >> > > > > specifies when a change was done.
>>>> >> > > > > I think this is different from treating time attributes as
>>>> regular
>>>> >> > > data.
>>>> >> > > > >
>>>> >> > > > > Cheers, Fabian
>>>> >> > > > >
>>>> >> > > > >
>>>> >> > > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman <
>>>> >> > > > > sjwies...@gmail.com
>>>> >> > > > > >:
>>>> >> > > > >
>>>> >> > > > > > I really like the TEMPORAL keyword, I find it very
>>>> intuitive.
>>>> >> > > > > >
>>>> >> > > > > > The down side of this approach would be that an additional
>>>> >> > > > preprocessing
>>>> >> > > > > > > step would not be possible anymore because there is no
>>>> >> preceding
>>>> >> > > > view.
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > > >  Yes and no. My understanding is we are not talking about
>>>> making
>>>> >> > any
>>>> >> > > > > > changes to how temporal tables are defined in the table
>>>> api.
>>>> >> Since
>>>> >> > > you
>>>> >> > > > > > cannot currently define temporal table functions in pure
>>>> SQL
>>>> >> > > > > applications,
>>>> >> > > > > > but only pre-register them in YAML, you can't do any
>>>> >> pre-processing
>>>> >> > > as
>>>> >> > > > it
>>>> >> > > > > > stands today. Preprocessing may be a generally useful
>>>> feature,
>>>> >> I'm
>>>> >> > > not
>>>> >> > > > > > sure, but this syntax does not lose us anything in pure SQL
>>>> >> > > > applications.
>>>> >> > > > > >
>>>> >> > > > > > These semantics are still unclear to me. Can retractions
>>>> only
>>>> >> occur
>>>> >> > > > > > > within watermarks? Or are they also used for
>>>> representing late
>>>> >> > > > updates?
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > > > I do not know the SQL standard well enough to give a
>>>> principled
>>>> >> > > > response
>>>> >> > > > > to
>>>> >> > > > > > this question. However, in my observation of production
>>>> >> workloads,
>>>> >> > > > users
>>>> >> > > > > of
>>>> >> > > > > > temporal table functions are doing so to denormalize star
>>>> >> schemas
>>>> >> > > > before
>>>> >> > > > > > performing further transformations and aggregations and
>>>> expect
>>>> >> the
>>>> >> > > > output
>>>> >> > > > > > to be an append stream. With the ongoing work to better
>>>> support
>>>> >> > > > > changelogs,
>>>> >> > > > > > the need for users to understand the differences in append
>>>> vs
>>>> >> > upsert
>>>> >> > > in
>>>> >> > > > > > their query may be diminishing but everyone else on this
>>>> thread
>>>> >> can
>>>> >> > > > > better
>>>> >> > > > > > speak to that.
>>>> >> > > > > >
>>>> >> > > > > > Seth
>>>> >> > > > > >
>>>> >> > > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther <
>>>> >> twal...@apache.org>
>>>> >> > > > > wrote:
>>>> >> > > > > >
>>>> >> > > > > > > Hi Fabian,
>>>> >> > > > > > >
>>>> >> > > > > > > thank you very much for this great summary!
>>>> >> > > > > > >
>>>> >> > > > > > > I wasn't aware of the Polymorphic Table Functions
>>>> standard.
>>>> >> This
>>>> >> > > is a
>>>> >> > > > > > > very interesting topic that we should definitely
>>>> consider in
>>>> >> the
>>>> >> > > > > future.
>>>> >> > > > > > > Maybe this could also help us in defining tables more
>>>> >> dynamically
>>>> >> > > > > within
>>>> >> > > > > > > a query. It could help solving problems as discussed in
>>>> >> FLIP-113.
>>>> >> > > > > > >
>>>> >> > > > > > > Regarding joining:
>>>> >> > > > > > >
>>>> >> > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead
>>>> of the
>>>> >> > > > current
>>>> >> > > > > > > `LATERAL TABLE(rates(x))` syntax. A function that also
>>>> behaves
>>>> >> > > like a
>>>> >> > > > > > > table and needs this special `LATERAL` keyword during
>>>> joining
>>>> >> is
>>>> >> > > not
>>>> >> > > > > > > very intuitive. The PTF could be used once they are fully
>>>> >> > supported
>>>> >> > > > by
>>>> >> > > > > > > Calcite and we have the big picture how to also use them
>>>> for
>>>> >> > other
>>>> >> > > > > > > time-based operations (windows?, joins?).
>>>> >> > > > > > >
>>>> >> > > > > > > Regarding how represent a temporal table:
>>>> >> > > > > > >
>>>> >> > > > > > > I think that our current DDL, current LookupTableSource
>>>> and
>>>> >> > > temporal
>>>> >> > > > > > > tables can fit nicely together.
>>>> >> > > > > > >
>>>> >> > > > > > > How about we simply introduce an additional keyword
>>>> >> `TEMPORAL` to
>>>> >> > > > > > > indicate history tracking semantics? I think this is the
>>>> >> minimal
>>>> >> > > > > > > invasive solution:
>>>> >> > > > > > >
>>>> >> > > > > > > CREATE TEMPORAL TABLE rates (
>>>> >> > > > > > >    currency CHAR(3) NOT NULL PRIMARY KEY,
>>>> >> > > > > > >    rate DOUBLE,
>>>> >> > > > > > >    rowtime TIMESTAMP,
>>>> >> > > > > > >    WATERMARK FOR rowtime AS rowtime - INTERVAL '5'
>>>> MINUTE)
>>>> >> > > > > > > WITH (...);
>>>> >> > > > > > >
>>>> >> > > > > > > - The primary key would be defined by the DDL.
>>>> >> > > > > > > - The available time attribute would be defined by the
>>>> DDL.
>>>> >> > Either
>>>> >> > > as
>>>> >> > > > > > > the only time attribute of the table or we introduce a
>>>> special
>>>> >> > > > > > > constraint similar to `PRIMARY KEY`.
>>>> >> > > > > > >
>>>> >> > > > > > > The down side of this approach would be that an
>>>> additional
>>>> >> > > > > preprocessing
>>>> >> > > > > > > step would not be possible anymore because there is no
>>>> >> preceding
>>>> >> > > > view.
>>>> >> > > > > > >
>>>> >> > > > > > > The `TEMPORAL` semantic can be stored in the properties
>>>> of the
>>>> >> > > table
>>>> >> > > > > > > when writing to a catalog. We do the same for watermarks
>>>> and
>>>> >> > > computed
>>>> >> > > > > > > columns.
>>>> >> > > > > > >
>>>> >> > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x`
>>>> >> would
>>>> >> > > only
>>>> >> > > > > > > work on processing time by a lookup into the external
>>>> system
>>>> >> or
>>>> >> > on
>>>> >> > > > > > > event-time by using the time semantics that the external
>>>> >> system
>>>> >> > > > > supports.
>>>> >> > > > > > >
>>>> >> > > > > > > Regarding retraction table with a primary key and a
>>>> >> > time-attribute:
>>>> >> > > > > > >
>>>> >> > > > > > > These semantics are still unclear to me. Can retractions
>>>> only
>>>> >> > occur
>>>> >> > > > > > > within watermarks? Or are they also used for
>>>> representing late
>>>> >> > > > updates?
>>>> >> > > > > > >
>>>> >> > > > > > > Regards,
>>>> >> > > > > > > Timo
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > > On 17.04.20 14:34, Fabian Hueske wrote:
>>>> >> > > > > > > > Hi all,
>>>> >> > > > > > > >
>>>> >> > > > > > > > First of all, I appologize for the text wall that's
>>>> >> > following...
>>>> >> > > > ;-)
>>>> >> > > > > > > >
>>>> >> > > > > > > > A temporal table join joins an append-only table and a
>>>> >> temporal
>>>> >> > > > > table.
>>>> >> > > > > > > > The question about how to represent a temporal table
>>>> join
>>>> >> boils
>>>> >> > > > down
>>>> >> > > > > to
>>>> >> > > > > > > two
>>>> >> > > > > > > > questions:
>>>> >> > > > > > > >
>>>> >> > > > > > > > 1) How to represent a temporal table
>>>> >> > > > > > > > 2) How to specify the join of an append-only table and
>>>> a
>>>> >> > temporal
>>>> >> > > > > table
>>>> >> > > > > > > >
>>>> >> > > > > > > > I'll discuss these points separately.
>>>> >> > > > > > > >
>>>> >> > > > > > > > # 1 How to represent a temporal table
>>>> >> > > > > > > >
>>>> >> > > > > > > > A temporal table is a table that can be looked up with
>>>> a
>>>> >> time
>>>> >> > > > > parameter
>>>> >> > > > > > > and
>>>> >> > > > > > > > which returns the rows of the table at that point in
>>>> time /
>>>> >> for
>>>> >> > > > that
>>>> >> > > > > > > > version.
>>>> >> > > > > > > > In order to be able to (conceptually) look up previous
>>>> >> > versions,
>>>> >> > > a
>>>> >> > > > > > > temporal
>>>> >> > > > > > > > table must be (conceptually) backed by a history table
>>>> that
>>>> >> > > tracks
>>>> >> > > > > all
>>>> >> > > > > > > > previous versions (see SqlServer docs [1]).
>>>> >> > > > > > > > In the context of our join, we added another
>>>> restriction
>>>> >> namely
>>>> >> > > > that
>>>> >> > > > > > the
>>>> >> > > > > > > > table must have a primary key, i.e., there is only one
>>>> row
>>>> >> for
>>>> >> > > each
>>>> >> > > > > > > version
>>>> >> > > > > > > > for each unique key.
>>>> >> > > > > > > >
>>>> >> > > > > > > > Hence, the requirements for a temporal table are:
>>>> >> > > > > > > > * The temporal table has a primary key / unique
>>>> attribute
>>>> >> > > > > > > > * The temporal table has a time-attribute that defines
>>>> the
>>>> >> > start
>>>> >> > > of
>>>> >> > > > > the
>>>> >> > > > > > > > validity interval of a row (processing time or event
>>>> time)
>>>> >> > > > > > > > * The system knows that the history of the table is
>>>> tracked
>>>> >> and
>>>> >> > > can
>>>> >> > > > > > infer
>>>> >> > > > > > > > how to look up a version.
>>>> >> > > > > > > >
>>>> >> > > > > > > > There are two possible types of input from which we
>>>> want to
>>>> >> > > create
>>>> >> > > > > > > temporal
>>>> >> > > > > > > > tables (that I'm aware of):
>>>> >> > > > > > > >
>>>> >> > > > > > > > * append-only tables, i.e., tables that contain the
>>>> full
>>>> >> change
>>>> >> > > > > history
>>>> >> > > > > > > > * retraction tables, i.e., tables that are updating
>>>> and do
>>>> >> not
>>>> >> > > > > remember
>>>> >> > > > > > > the
>>>> >> > > > > > > > history.
>>>> >> > > > > > > >
>>>> >> > > > > > > > There are a few ways to do this:
>>>> >> > > > > > > >
>>>> >> > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a
>>>> time
>>>> >> > > > attribute.
>>>> >> > > > > > > >
>>>> >> > > > > > > > The following view definition results in a view that
>>>> >> provides
>>>> >> > the
>>>> >> > > > > > latest
>>>> >> > > > > > > > rate for each currency.
>>>> >> > > > > > > >
>>>> >> > > > > > > > CREATE VIEW rates AS
>>>> >> > > > > > > > SELECT
>>>> >> > > > > > > >    currency, MAX(rate) as rate, MAX(rowtime) as rowtime
>>>> >> > > > > > > > FROM rates_history rh1
>>>> >> > > > > > > > WHERE
>>>> >> > > > > > > >    rh1.rowtime = (
>>>> >> > > > > > > >      SELECT max(rowtime)
>>>> >> > > > > > > >      FROM rates_history rh2
>>>> >> > > > > > > >      WHERE rh2.curreny = rh1.currency)
>>>> >> > > > > > > > GROUP BY currency
>>>> >> > > > > > > > WITH (
>>>> >> > > > > > > >    'historytracking' = 'true',
>>>> >> > > > > > > >    'historytracking.starttime' = 'rowtime');
>>>> >> > > > > > > >
>>>> >> > > > > > > > However, we also need to tell the system to track the
>>>> >> history
>>>> >> > of
>>>> >> > > > all
>>>> >> > > > > > > > changes of the view in order to be able to look it up.
>>>> >> > > > > > > > That's what the properties in the WITH clause are for
>>>> >> (inspired
>>>> >> > > by
>>>> >> > > > > > > > SqlServer's TEMPORAL TABLE DDL syntax).
>>>> >> > > > > > > > Note that this is *not* a syntax proposal but only
>>>> meant to
>>>> >> > show
>>>> >> > > > > which
>>>> >> > > > > > > > information is needed.
>>>> >> > > > > > > > This view allows to look up any version of the "rates"
>>>> view.
>>>> >> > > > > > > >
>>>> >> > > > > > > > In addition to designing and implementing the DDL
>>>> syntax for
>>>> >> > > views
>>>> >> > > > > that
>>>> >> > > > > > > > support temporal lookups, the optimizer would need to
>>>> >> > understand
>>>> >> > > > the
>>>> >> > > > > > > > semantics of the view definition in depth.
>>>> >> > > > > > > > Among other things it needs to understand that the
>>>> MAX()
>>>> >> > > > aggregation
>>>> >> > > > > on
>>>> >> > > > > > > the
>>>> >> > > > > > > > time-attribute preserves its watermark alignment.
>>>> >> > > > > > > > AFAIK, this is not the case at the moment (the time
>>>> >> attribute
>>>> >> > > would
>>>> >> > > > > be
>>>> >> > > > > > > > converted into a regular TIMESTAMP and lose it's time
>>>> >> attribute
>>>> >> > > > > > > properties)
>>>> >> > > > > > > >
>>>> >> > > > > > > > ## 1.2 A retraction table with a primary key and a
>>>> >> > > time-attribute.
>>>> >> > > > > > > >
>>>> >> > > > > > > > On paper it looks like such a table would automatically
>>>> >> qualify
>>>> >> > > as
>>>> >> > > > a
>>>> >> > > > > > > > time-versioned table because it completely fulfills the
>>>> >> > > > requirements.
>>>> >> > > > > > > > However, I don't think we can use it *as is* as a
>>>> temporal
>>>> >> > table
>>>> >> > > if
>>>> >> > > > > we
>>>> >> > > > > > > want
>>>> >> > > > > > > > to have clean semantics.
>>>> >> > > > > > > > The problem here is the "lost history" of the
>>>> retraction
>>>> >> table.
>>>> >> > > The
>>>> >> > > > > > > dynamic
>>>> >> > > > > > > > table that is defined on the retraction stream only
>>>> stores
>>>> >> the
>>>> >> > > > latest
>>>> >> > > > > > > > version (even though it sees all versions).
>>>> >> > > > > > > > Conceptually, a temporal table look up the version of
>>>> the
>>>> >> table
>>>> >> > > at
>>>> >> > > > > any
>>>> >> > > > > > > > point in time because it is backed by a history table.
>>>> >> > > > > > > > If this information is not available, we cannot have a
>>>> >> > > semantically
>>>> >> > > > > > clean
>>>> >> > > > > > > > definition of the join IMO.
>>>> >> > > > > > > >
>>>> >> > > > > > > > Therefore we should define the table in a way that the
>>>> >> system
>>>> >> > > knows
>>>> >> > > > > > that
>>>> >> > > > > > > > the history is tracked.
>>>> >> > > > > > > > In MSSQL uses a syntax similar to this one
>>>> >> > > > > > > >
>>>> >> > > > > > > > CREATE TABLE rates (
>>>> >> > > > > > > >      currency CHAR(3) NOT NULL PRIMARY KEY,
>>>> >> > > > > > > >      rate DOUBLE,
>>>> >> > > > > > > >      rowtime TIMESTAMP,
>>>> >> > > > > > > >      WATERMARK FOR rowtime AS rowtime - INTERVAL '5'
>>>> MINUTE)
>>>> >> > > > > > > > WITH (
>>>> >> > > > > > > >    'historytracking' = 'true',
>>>> >> > > > > > > >    'historytracking.starttime' = 'rowtime');
>>>> >> > > > > > > >
>>>> >> > > > > > > > The 'historytracking' properties would decare that the
>>>> table
>>>> >> > > tracks
>>>> >> > > > > its
>>>> >> > > > > > > > history and also specify the attribute (rowtime) that
>>>> is
>>>> >> used
>>>> >> > for
>>>> >> > > > > > > > versioning.
>>>> >> > > > > > > >
>>>> >> > > > > > > > ## 1.3 Registering a TableFunction that takes an
>>>> append-only
>>>> >> > > table
>>>> >> > > > > with
>>>> >> > > > > > > > time attribute
>>>> >> > > > > > > >
>>>> >> > > > > > > > The TableFunction requires a few parameters:
>>>> >> > > > > > > > * the source table from which to derive the temporal
>>>> table
>>>> >> > > > > > > > * the key attribute on which the versions of the source
>>>> >> table
>>>> >> > > > should
>>>> >> > > > > be
>>>> >> > > > > > > > computed
>>>> >> > > > > > > > * the time attribute that defines the versions
>>>> >> > > > > > > > * a lookup timestamp for the version of that is
>>>> returned.
>>>> >> > > > > > > >
>>>> >> > > > > > > > The reason why we chose the TableFunction approach
>>>> over the
>>>> >> > VIEW
>>>> >> > > > > > approach
>>>> >> > > > > > > > so far were:
>>>> >> > > > > > > > * It is easier for the optimizer to identify a build-in
>>>> >> table
>>>> >> > > > > function
>>>> >> > > > > > > than
>>>> >> > > > > > > > to analyze and reason about a generic VIEW.
>>>> >> > > > > > > > * We would need to make the optimizer a lot smarter to
>>>> infer
>>>> >> > all
>>>> >> > > > the
>>>> >> > > > > > > > properties from the generic VIEW definition that we
>>>> need
>>>> >> for a
>>>> >> > > > > temporal
>>>> >> > > > > > > > table join.
>>>> >> > > > > > > > * Passing a parameter to a function is a known thing,
>>>> >> passing a
>>>> >> > > > > > parameter
>>>> >> > > > > > > > to a VIEW not so much.
>>>> >> > > > > > > > * Users would need to specify the VIEW exactly
>>>> correct, such
>>>> >> > that
>>>> >> > > > it
>>>> >> > > > > > can
>>>> >> > > > > > > be
>>>> >> > > > > > > > used as a temporal table. Look at 1.1 why this is not
>>>> >> trivial.
>>>> >> > > > > > > >
>>>> >> > > > > > > > There is two ways to use a TableFunction:
>>>> >> > > > > > > >
>>>> >> > > > > > > > ### 1.3.1 Built-in and pre-registered function that is
>>>> >> > > > parameterized
>>>> >> > > > > in
>>>> >> > > > > > > the
>>>> >> > > > > > > > SQL query
>>>> >> > > > > > > >
>>>> >> > > > > > > > Here, we do not need to do anything to register the
>>>> >> function.
>>>> >> > We
>>>> >> > > > > simply
>>>> >> > > > > > > use
>>>> >> > > > > > > > it in the query (see example in 2.2 below)
>>>> >> > > > > > > >
>>>> >> > > > > > > > ### 1.3.2 Parameterize function when it is registered
>>>> in the
>>>> >> > > > catalog
>>>> >> > > > > > > (with
>>>> >> > > > > > > > a provided Java implementation)
>>>> >> > > > > > > >
>>>> >> > > > > > > > This is the approach, we've used so far. In the Table
>>>> API,
>>>> >> the
>>>> >> > > > > function
>>>> >> > > > > > > is
>>>> >> > > > > > > > first parameterized and created and then registered:
>>>> >> > > > > > > > We would need a DDL syntax to parameterize UDFs on
>>>> >> > registration.
>>>> >> > > > > > > > I don't want to propose a syntax here, but just to get
>>>> an
>>>> >> idea
>>>> >> > it
>>>> >> > > > > might
>>>> >> > > > > > > > look like this:
>>>> >> > > > > > > >
>>>> >> > > > > > > > CREATE FUNCTION rates AS
>>>> >> > > > > > > > 'org.apache.flink.table.udfs.TemporalTableFunction'
>>>> WITH
>>>> >> > > ('table' =
>>>> >> > > > > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
>>>> >> > > > > > > >
>>>> >> > > > > > > > Right now, the Flink Catalog interface does not have
>>>> the
>>>> >> > > > > functionality
>>>> >> > > > > > to
>>>> >> > > > > > > > store such parameters and would need some hacks to
>>>> properly
>>>> >> > > create
>>>> >> > > > > > > properly
>>>> >> > > > > > > > parameterize function instances.
>>>> >> > > > > > > >
>>>> >> > > > > > > >
>>>> >> > > > > > > >
>>>> >> > > > > > > > # 2 Defining a join of an append-only table and a
>>>> temporal
>>>> >> > table
>>>> >> > > > > > > >
>>>> >> > > > > > > > The append-only table needs to have a time-attribute
>>>> >> > (processing
>>>> >> > > > time
>>>> >> > > > > > or
>>>> >> > > > > > > > event time, but same as the temporal table).
>>>> >> > > > > > > > The join then needs to specify two things:
>>>> >> > > > > > > > * an equality predicate that includes the primary key
>>>> of the
>>>> >> > > > temporal
>>>> >> > > > > > > table
>>>> >> > > > > > > > * declare the time attribute of the append-only table
>>>> as the
>>>> >> > time
>>>> >> > > > as
>>>> >> > > > > of
>>>> >> > > > > > > > which to look up the temporal table, i.e, get the
>>>> version of
>>>> >> > the
>>>> >> > > > > > temporal
>>>> >> > > > > > > > table that is valid for the timestamp of the current
>>>> row
>>>> >> from
>>>> >> > the
>>>> >> > > > > > > > append-only table
>>>> >> > > > > > > >
>>>> >> > > > > > > > The tricky part (from a syntax point of view) is to
>>>> specify
>>>> >> the
>>>> >> > > > > lookup
>>>> >> > > > > > > > time.
>>>> >> > > > > > > >
>>>> >> > > > > > > > ## 2.1 the temporal table is a regular table or view
>>>> (see
>>>> >> > > > approaches
>>>> >> > > > > > 1.1
>>>> >> > > > > > > > and 1.2 above)
>>>> >> > > > > > > >
>>>> >> > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x"
>>>> >> clause as
>>>> >> > > > > > follows:
>>>> >> > > > > > > >
>>>> >> > > > > > > > SELECT *
>>>> >> > > > > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF
>>>> o.ordertime
>>>> >> > > > > > > > WHERE o.currency = r.currency
>>>> >> > > > > > > >
>>>> >> > > > > > > > IMO, this is a great syntax and the one we should
>>>> strive
>>>> >> for.
>>>> >> > > > > > > > We would need to bend the rules of the SQL standard
>>>> which
>>>> >> only
>>>> >> > > > > allows x
>>>> >> > > > > > > in
>>>> >> > > > > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the
>>>> table on
>>>> >> > which
>>>> >> > > > it
>>>> >> > > > > is
>>>> >> > > > > > > > applied usually needs to be a specific type (not sure
>>>> if
>>>> >> views
>>>> >> > > are
>>>> >> > > > > > > > supported), but I guess this is fine.
>>>> >> > > > > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already
>>>> supported for
>>>> >> > > > > > LookupTable
>>>> >> > > > > > > > Joins if x is a processing time attribute [2].
>>>> >> > > > > > > >
>>>> >> > > > > > > > ## 2.2 the temporal table is a TableFunction and
>>>> >> parameterized
>>>> >> > in
>>>> >> > > > the
>>>> >> > > > > > > query
>>>> >> > > > > > > > (see 1.3.1 above)
>>>> >> > > > > > > >
>>>> >> > > > > > > > SELECT *
>>>> >> > > > > > > > FROM orders o,
>>>> >> > > > > > > >    TEMPORAL_TABLE(
>>>> >> > > > > > > >      table => TABLE(rates_history),
>>>> >> > > > > > > >      key => DESCRIPTOR(currency),
>>>> >> > > > > > > >      time => DESCRIPTOR(rowtime)) r
>>>> >> > > > > > > >    ON o.currency = r.currency
>>>> >> > > > > > > >
>>>> >> > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing
>>>> was
>>>> >> > > > registered
>>>> >> > > > > in
>>>> >> > > > > > > the
>>>> >> > > > > > > > catalog (except the rates_history table).
>>>> >> > > > > > > > In fact this is valid SQL:2016 syntax and called
>>>> Polymorphic
>>>> >> > > Table
>>>> >> > > > > > > > Functions. Have a look here [3].
>>>> >> > > > > > > >
>>>> >> > > > > > > > ## 2.3 the temporal table is a TableFunction that was
>>>> >> > > parameterized
>>>> >> > > > > > > during
>>>> >> > > > > > > > registration (see 1.3.2 above)
>>>> >> > > > > > > >
>>>> >> > > > > > > > This is what we have at the momement.
>>>> >> > > > > > > >
>>>> >> > > > > > > > SELECT *
>>>> >> > > > > > > > FROM orders o,
>>>> >> > > > > > > >    LATERAL TABLE (rates(o.ordertime))
>>>> >> > > > > > > >    ON o.currency = r.currency
>>>> >> > > > > > > >
>>>> >> > > > > > > > The TableFunction "rates" was registered in the
>>>> catalog and
>>>> >> > > > > > parameterized
>>>> >> > > > > > > > to the "rates_history" append-only table, the key was
>>>> set to
>>>> >> > > > > > "currency",
>>>> >> > > > > > > > and the time attribute was declared.
>>>> >> > > > > > > >
>>>> >> > > > > > > > # SUMMARY
>>>> >> > > > > > > >
>>>> >> > > > > > > > IMO we should in the long run aim to define temporal
>>>> tables
>>>> >> > > either
>>>> >> > > > as
>>>> >> > > > > > > > upsert retraction tables and views on append-only
>>>> tables and
>>>> >> > join
>>>> >> > > > > them
>>>> >> > > > > > > > using the "FOR SYSTEM_TIME AS OF x" syntax.
>>>> >> > > > > > > > I guess it is debatable whether we need to decare to
>>>> track
>>>> >> > > history
>>>> >> > > > > for
>>>> >> > > > > > > > these tables (which we don't actually do) or if we do
>>>> it by
>>>> >> > > > > convention
>>>> >> > > > > > if
>>>> >> > > > > > > > the table has a time attribute.
>>>> >> > > > > > > > It should be (relatively) easy to get this to work for
>>>> >> > retraction
>>>> >> > > > > > tables
>>>> >> > > > > > > > which will be supported soon.
>>>> >> > > > > > > > It will be more work for views because we need to
>>>> improve
>>>> >> the
>>>> >> > > time
>>>> >> > > > > > > > attribute handling with MAX() aggregations.
>>>> >> > > > > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for
>>>> >> > > > > > LookupTableSources
>>>> >> > > > > > > > and would "only" need to be adapted to work on temporal
>>>> >> tables.
>>>> >> > > > > > > >
>>>> >> > > > > > > > Registering parameterized TableFunctions in the catalog
>>>> >> seems
>>>> >> > > like
>>>> >> > > > > > quite
>>>> >> > > > > > > a
>>>> >> > > > > > > > bit of work. We need new DDL syntax, extend the
>>>> catalog and
>>>> >> > > > function
>>>> >> > > > > > > > instantiation. This won't be easy, IMO.
>>>> >> > > > > > > > If we only support them as TEMPORARY FUNCTION which
>>>> are not
>>>> >> > > > > registered
>>>> >> > > > > > in
>>>> >> > > > > > > > the catalog it will be easier. The question is whether
>>>> it is
>>>> >> > > worth
>>>> >> > > > > the
>>>> >> > > > > > > > effort if we decide for the other approach.
>>>> >> > > > > > > >
>>>> >> > > > > > > > Using TableFunctions that are parameterized in the
>>>> query
>>>> >> will
>>>> >> > > > require
>>>> >> > > > > > to
>>>> >> > > > > > > > extend the Calcite parser and framework to support
>>>> >> Polymorphic
>>>> >> > > > Table
>>>> >> > > > > > > > Functions.
>>>> >> > > > > > > > However, there might already some work be done there,
>>>> >> because
>>>> >> > > AFAIK
>>>> >> > > > > > > Apache
>>>> >> > > > > > > > Beam aims to support this syntax for windowing
>>>> functions as
>>>> >> > > > described
>>>> >> > > > > > in
>>>> >> > > > > > > > the "One SQL to rule them all" paper [4].
>>>> >> > > > > > > > It might be the fastest and fully SQL standard
>>>> compliant
>>>> >> way.
>>>> >> > > > > > > >
>>>> >> > > > > > > > Cheers,
>>>> >> > > > > > > > Fabian
>>>> >> > > > > > > >
>>>> >> > > > > > > > [1]
>>>> >> > > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables
>>>> >> > > > > > > > [2]
>>>> >> > > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
>>>> >> > > > > > > > [3]
>>>> >> > > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip
>>>> >> > > > > > > > [4] https://arxiv.org/abs/1905.12133
>>>> >> > > > > > > >
>>>> >> > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <
>>>> >> > > > imj...@gmail.com
>>>> >> > > > > >:
>>>> >> > > > > > > >
>>>> >> > > > > > > >> Hi Konstantin,
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> Thanks for bringing this discussion. I think temporal
>>>> join
>>>> >> is
>>>> >> > a
>>>> >> > > > very
>>>> >> > > > > > > >> important feature and should be exposed to pure SQL
>>>> users.
>>>> >> > > > > > > >> And I already received many requirements like this.
>>>> >> > > > > > > >> However, my concern is that how to properly support
>>>> this
>>>> >> > feature
>>>> >> > > > in
>>>> >> > > > > > SQL.
>>>> >> > > > > > > >> Introducing a DDL syntax for Temporal Table Function
>>>> is one
>>>> >> > way,
>>>> >> > > > but
>>>> >> > > > > > > maybe
>>>> >> > > > > > > >> not the best one.
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> The most important reason is that the underlying of
>>>> >> temporal
>>>> >> > > table
>>>> >> > > > > > > function
>>>> >> > > > > > > >> is exactly a changelog stream.
>>>> >> > > > > > > >> The temporal join is actually temporal joining a fact
>>>> >> stream
>>>> >> > > with
>>>> >> > > > > the
>>>> >> > > > > > > >> changelog stream on processing time or event time.
>>>> >> > > > > > > >> We will soon support to create a changelog source
>>>> using DDL
>>>> >> > once
>>>> >> > > > > > FLIP-95
>>>> >> > > > > > > >> and FLIP-105 is finished.
>>>> >> > > > > > > >> At that time, we can have a simple DDL to create
>>>> changelog
>>>> >> > > source
>>>> >> > > > > like
>>>> >> > > > > > > >> this;
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> CREATE TABLE rate_changelog (
>>>> >> > > > > > > >>    currency STRING,
>>>> >> > > > > > > >>    rate DECIMAL
>>>> >> > > > > > > >> ) WITH (
>>>> >> > > > > > > >>    'connector' = 'kafka',
>>>> >> > > > > > > >>    'topic' = 'rate_binlog',
>>>> >> > > > > > > >>    'properties.bootstrap.servers' = 'localhost:9092',
>>>> >> > > > > > > >>    'format' = 'debezium-json'
>>>> >> > > > > > > >> );
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> In the meanwhile, we already have a SQL standard
>>>> temporal
>>>> >> join
>>>> >> > > > > syntax
>>>> >> > > > > > > [1],
>>>> >> > > > > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
>>>> >> > > > > > > >> It is currently used as dimension table lookup join,
>>>> but
>>>> >> the
>>>> >> > > > > semantic
>>>> >> > > > > > is
>>>> >> > > > > > > >> the same to the "temporal table function join"[2].
>>>> >> > > > > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is
>>>> more
>>>> >> > > nature
>>>> >> > > > > > > >> becuase the definition of B is a *table* not a *table
>>>> >> > function*,
>>>> >> > > > > > > >> and the syntax is included in SQL standard.
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> So once we have the ability to define "rate_changelog"
>>>> >> table,
>>>> >> > > then
>>>> >> > > > > we
>>>> >> > > > > > > can
>>>> >> > > > > > > >> use the following query to temporal join the
>>>> changelog on
>>>> >> > > > processing
>>>> >> > > > > > > time.
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> SELECT *
>>>> >> > > > > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF
>>>> >> > > > > orders.proctime
>>>> >> > > > > > > >> ON orders.currency = rate_changelog.currency;
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we
>>>> can
>>>> >> > easily
>>>> >> > > > to
>>>> >> > > > > > > support
>>>> >> > > > > > > >> "temporal join on changelogs" without introducing new
>>>> >> syntax.
>>>> >> > > > > > > >> IMO, introducing a DDL syntax for Temporal Table
>>>> Function
>>>> >> > looks
>>>> >> > > > like
>>>> >> > > > > > > not an
>>>> >> > > > > > > >> easy way and may have repetitive work.
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> Best,
>>>> >> > > > > > > >> Jark
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> [1]:
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>> >> > > > > > > >> [2]:
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>
>>>> >> > > > > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li <
>>>> >> libenc...@gmail.com
>>>> >> > >
>>>> >> > > > > wrote:
>>>> >> > > > > > > >>
>>>> >> > > > > > > >>> Hi Konstantin,
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>> Thanks for bringing up this discussion. +1 for the
>>>> idea.
>>>> >> > > > > > > >>> We have met this in our company too, and I planned to
>>>> >> support
>>>> >> > > it
>>>> >> > > > > > > recently
>>>> >> > > > > > > >>> in our internal branch.
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>> regarding to your questions,
>>>> >> > > > > > > >>> 1) I think it might be more a table/view than
>>>> function,
>>>> >> just
>>>> >> > > like
>>>> >> > > > > > > >> Temporal
>>>> >> > > > > > > >>> Table (which is also known as
>>>> >> > > > > > > >>> dimension table). Maybe we need a DDL like CREATE
>>>> VIEW and
>>>> >> > plus
>>>> >> > > > > some
>>>> >> > > > > > > >>> additional settings.
>>>> >> > > > > > > >>> 2) If we design the DDL for it like view, then maybe
>>>> >> > temporary
>>>> >> > > is
>>>> >> > > > > ok
>>>> >> > > > > > > >>> enough.
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四
>>>> >> 下午8:16写道:
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>>> Hi everyone,
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> it would be very useful if temporal tables could be
>>>> >> created
>>>> >> > > via
>>>> >> > > > > > DDL.
>>>> >> > > > > > > >>>> Currently, users either need to do this in the
>>>> Table API
>>>> >> or
>>>> >> > in
>>>> >> > > > the
>>>> >> > > > > > > >>>> environment file of the Flink CLI, which both
>>>> require the
>>>> >> > user
>>>> >> > > > to
>>>> >> > > > > > > >> switch
>>>> >> > > > > > > >>>> the context of the SQL CLI/Editor. I recently
>>>> created a
>>>> >> > ticket
>>>> >> > > > for
>>>> >> > > > > > > this
>>>> >> > > > > > > >>>> request [1].
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> I see two main questions:
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> 1) What would be the DDL syntax? A Temporal Table
>>>> is on
>>>> >> the
>>>> >> > > one
>>>> >> > > > > > hand a
>>>> >> > > > > > > >>> view
>>>> >> > > > > > > >>>> and on the other a function depending on how you
>>>> look at
>>>> >> it.
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> 2) Would this temporal table view/function be
>>>> stored in
>>>> >> the
>>>> >> > > > > catalog
>>>> >> > > > > > or
>>>> >> > > > > > > >>> only
>>>> >> > > > > > > >>>> be temporary?
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> I personally do not have much experience in this
>>>> area of
>>>> >> > > Flink,
>>>> >> > > > > so I
>>>> >> > > > > > > am
>>>> >> > > > > > > >>>> looking forward to hearing your thoughts on this.
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> Best,
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> Konstantin
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> [1]
>>>> https://issues.apache.org/jira/browse/FLINK-16824
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> --
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>> Konstantin Knauf
>>>> >> > > > > > > >>>>
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>> --
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>> Benchao Li
>>>> >> > > > > > > >>> School of Electronics Engineering and Computer
>>>> Science,
>>>> >> > Peking
>>>> >> > > > > > > University
>>>> >> > > > > > > >>> Tel:+86-15650713730
>>>> >> > > > > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>> >> > > > > > > >>>
>>>> >> > > > > > > >>
>>>> >> > > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> >>
>>>> >> --
>>>> >>
>>>> >> Konstantin Knauf
>>>> >>
>>>> >> https://twitter.com/snntrable
>>>> >>
>>>> >> https://github.com/knaufk
>>>> >>
>>>> >
>>>>
>>>

Reply via email to