Hi,

I agree what Fabian said above.
Besides, IMO, (3) is in a lower priority and will involve much more things.
It makes sense to me to do it in two-phase.

Regarding to (3), the key point to convert an append-only table into
changelog table is that the framework should know the operation type,
so we introduced a special CREATE VIEW syntax to do it in the documentation
[1]. Here is an example:

-- my_binlog table is registered as an append-only table
CREATE TABLE my_binlog (
  before ROW<...>,
  after ROW<...>,
  op STRING,
  op_ms TIMESTAMP(3)
) WITH (
  'connector.type' = 'kafka',
  ...
);

-- interpret my_binlog as a changelog on the op_type and id key
CREATE VIEW my_table AS
  SELECT
    after.*
  FROM my_binlog
  CHANGELOG OPERATION BY op
  UPDATE KEY BY (id);

-- my_table will materialize the insert/delete/update changes
-- if we have 4 records in dbz that
-- a create for 1004
-- an update for 1004
-- a create for 1005
-- a delete for 1004
> SELECT COUNT(*) FROM my_table;
+-----------+
|  COUNT(*) |
+-----------+
|     1     |
+-----------+

Best,
Jark

[1]:
https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb


On Fri, 8 May 2020 at 00:24, Fabian Hueske <fhue...@gmail.com> wrote:

> Thanks for the summary Konstantin.
> I think you got all points right.
>
> IMO, the way forward would be to work on a FLIP to define
> * the concept of temporal tables,
> * how to feed them from retraction tables
> * how to feed them from append-only tables
> * their specification with CREATE TEMPORAL TABLE,
> * how to use temporal tables in temporal table joins
> * how (if at all) to use temporal tables in other types of queries
>
> We would keep the LATERAL TABLE syntax because it used for regular
> table-valued functions.
> However, we would probably remove the TemporalTableFunction (which is a
> built-in table-valued function) after we deprecated it for a while.
>
> Cheers, Fabian
>
> Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf <
> kna...@apache.org>:
>
>> Hi everyone,
>>
>> Thanks everyone for joining the discussion on this. Please let me
>> summarize
>> what I have understood so far.
>>
>> 1) For joining an append-only table and a temporal table the syntax the
>> "FOR
>> SYSTEM_TIME AS OF <time-attribute>" seems to be preferred (Fabian, Timo,
>> Seth).
>>
>> 2) To define a temporal table based on a changelog stream from an external
>> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be used.
>> 3) In order to also support temporal tables derived from an append-only
>> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian)
>> or
>> need to have a way to convert an append-only table into a changelog table
>> (briefly discussed in [1]). It is not completely clear to me how a
>> temporal
>> table based on an append-only table would be with the syntax proposed in
>> [1] and 2). @Jark Wu <imj...@gmail.com> could you elaborate a bit on
>> that?
>>
>> How do we move forward with this?
>>
>> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense.
>> What do you think? * If we proceed like this, what would this mean for the
>> current syntax of LATERAL TABLE? Would we keep it? Would we eventually
>> deprecate and drop it? Since only after 3) we would be on par with the
>> current temporal table function join, I assume, we could only drop it
>> thereafter.
>>
>> Thanks, Konstantin
>>
>> [1]
>>
>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6
>>
>>
>> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu <imj...@gmail.com> wrote:
>>
>> > Hi Fabian,
>> >
>> > Just to clarify a little bit, we decided to move the "converting
>> > append-only table into changelog table" into future work.
>> > So FLIP-105 only introduced some CDC formats (debezium) and new
>> TableSource
>> > interfaces proposed in FLIP-95.
>> > I should have started a new FLIP for the new CDC formats and keep
>> FLIP-105
>> > as it is to avoid the confusion, sorry about that.
>> >
>> > Best,
>> > Jark
>> >
>> >
>> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske <fhue...@gmail.com> wrote:
>> >
>> > > Thanks Jark!
>> > >
>> > > I certainly need to read up on FLIP-105 (and I'll try to adjust my
>> > > terminology to changelog table from now on ;-) )
>> > > If FLIP-105 addresses the issue of converting an append-only table
>> into a
>> > > changelog table that upserts on primary key (basically what the VIEW
>> > > definition in my first email did),
>> > > TEMPORAL VIEWs become much less important.
>> > > In that case, we would be well served with TEMPORAL TABLE and TEMPORAL
>> > VIEW
>> > > would be a nice-to-have feature for some later time.
>> > >
>> > > Cheers, Fabian
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu <imj...@gmail.com
>> >:
>> > >
>> > > > Hi Fabian,
>> > > >
>> > > > I think converting an append-only table into temporal table contains
>> > two
>> > > > things:
>> > > > (1) converting append-only table into changelog table (or retraction
>> > > table
>> > > > as you said)
>> > > > (2) define the converted changelog table (maybe is a view now) as
>> > > temporal
>> > > > (or history tracked).
>> > > >
>> > > > The first thing is also mentioned and discussed in FLIP-105 design
>> > draft
>> > > > [1] which proposed a syntax
>> > > > to convert the append-only table into a changelog table.
>> > > >
>> > > > I think TEMPORAL TABLE is quite straightforward and simple, and can
>> > > satisfy
>> > > > most existing changelog
>> > > > data with popular CDC formats. TEMPORAL VIEW is flexible but will
>> > involve
>> > > > more SQL codes. I think
>> > > > we can support them both.
>> > > >
>> > > > Best,
>> > > > Jark
>> > > >
>> > > > [1]:
>> > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb
>> > > >
>> > > > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > I agree with most of what Timo said.
>> > > > >
>> > > > > The TEMPORAL keyword (which unfortunately might be easily confused
>> > with
>> > > > > TEMPORARY...) looks very intuitive and I think using the only time
>> > > > > attribute for versioning would be a good choice.
>> > > > >
>> > > > > However, TEMPORAL TABLE on retraction tables do not solve the full
>> > > > problem.
>> > > > > I believe there will be also cases where we need to derive a
>> temporal
>> > > > table
>> > > > > from an append only table (what TemporalTableFunctions do right
>> now).
>> > > > > I think the best choice for this would be TEMPORAL VIEW but as I
>> > > > explained,
>> > > > > it might be a longer way until this can be supported.
>> > > > > TEMPORAL VIEW would also address the problem of preprocessing.
>> > > > >
>> > > > > > Regarding retraction table with a primary key and a
>> time-attribute:
>> > > > > > These semantics are still unclear to me. Can retractions only
>> occur
>> > > > > > within watermarks? Or are they also used for representing late
>> > > updates?
>> > > > >
>> > > > > Time attributes and retraction streams are a challenging topic
>> that I
>> > > > > haven't completely understood yet.
>> > > > > So far we treated time attributes always as part of the data.
>> > > > > In combination with retractions, it seems that they become
>> metadata
>> > > that
>> > > > > specifies when a change was done.
>> > > > > I think this is different from treating time attributes as regular
>> > > data.
>> > > > >
>> > > > > Cheers, Fabian
>> > > > >
>> > > > >
>> > > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman <
>> > > > > sjwies...@gmail.com
>> > > > > >:
>> > > > >
>> > > > > > I really like the TEMPORAL keyword, I find it very intuitive.
>> > > > > >
>> > > > > > The down side of this approach would be that an additional
>> > > > preprocessing
>> > > > > > > step would not be possible anymore because there is no
>> preceding
>> > > > view.
>> > > > > > >
>> > > > > >
>> > > > > >  Yes and no. My understanding is we are not talking about making
>> > any
>> > > > > > changes to how temporal tables are defined in the table api.
>> Since
>> > > you
>> > > > > > cannot currently define temporal table functions in pure SQL
>> > > > > applications,
>> > > > > > but only pre-register them in YAML, you can't do any
>> pre-processing
>> > > as
>> > > > it
>> > > > > > stands today. Preprocessing may be a generally useful feature,
>> I'm
>> > > not
>> > > > > > sure, but this syntax does not lose us anything in pure SQL
>> > > > applications.
>> > > > > >
>> > > > > > These semantics are still unclear to me. Can retractions only
>> occur
>> > > > > > > within watermarks? Or are they also used for representing late
>> > > > updates?
>> > > > > > >
>> > > > > >
>> > > > > > I do not know the SQL standard well enough to give a principled
>> > > > response
>> > > > > to
>> > > > > > this question. However, in my observation of production
>> workloads,
>> > > > users
>> > > > > of
>> > > > > > temporal table functions are doing so to denormalize star
>> schemas
>> > > > before
>> > > > > > performing further transformations and aggregations and expect
>> the
>> > > > output
>> > > > > > to be an append stream. With the ongoing work to better support
>> > > > > changelogs,
>> > > > > > the need for users to understand the differences in append vs
>> > upsert
>> > > in
>> > > > > > their query may be diminishing but everyone else on this thread
>> can
>> > > > > better
>> > > > > > speak to that.
>> > > > > >
>> > > > > > Seth
>> > > > > >
>> > > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther <
>> twal...@apache.org>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Fabian,
>> > > > > > >
>> > > > > > > thank you very much for this great summary!
>> > > > > > >
>> > > > > > > I wasn't aware of the Polymorphic Table Functions standard.
>> This
>> > > is a
>> > > > > > > very interesting topic that we should definitely consider in
>> the
>> > > > > future.
>> > > > > > > Maybe this could also help us in defining tables more
>> dynamically
>> > > > > within
>> > > > > > > a query. It could help solving problems as discussed in
>> FLIP-113.
>> > > > > > >
>> > > > > > > Regarding joining:
>> > > > > > >
>> > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of the
>> > > > current
>> > > > > > > `LATERAL TABLE(rates(x))` syntax. A function that also behaves
>> > > like a
>> > > > > > > table and needs this special `LATERAL` keyword during joining
>> is
>> > > not
>> > > > > > > very intuitive. The PTF could be used once they are fully
>> > supported
>> > > > by
>> > > > > > > Calcite and we have the big picture how to also use them for
>> > other
>> > > > > > > time-based operations (windows?, joins?).
>> > > > > > >
>> > > > > > > Regarding how represent a temporal table:
>> > > > > > >
>> > > > > > > I think that our current DDL, current LookupTableSource and
>> > > temporal
>> > > > > > > tables can fit nicely together.
>> > > > > > >
>> > > > > > > How about we simply introduce an additional keyword
>> `TEMPORAL` to
>> > > > > > > indicate history tracking semantics? I think this is the
>> minimal
>> > > > > > > invasive solution:
>> > > > > > >
>> > > > > > > CREATE TEMPORAL TABLE rates (
>> > > > > > >    currency CHAR(3) NOT NULL PRIMARY KEY,
>> > > > > > >    rate DOUBLE,
>> > > > > > >    rowtime TIMESTAMP,
>> > > > > > >    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
>> > > > > > > WITH (...);
>> > > > > > >
>> > > > > > > - The primary key would be defined by the DDL.
>> > > > > > > - The available time attribute would be defined by the DDL.
>> > Either
>> > > as
>> > > > > > > the only time attribute of the table or we introduce a special
>> > > > > > > constraint similar to `PRIMARY KEY`.
>> > > > > > >
>> > > > > > > The down side of this approach would be that an additional
>> > > > > preprocessing
>> > > > > > > step would not be possible anymore because there is no
>> preceding
>> > > > view.
>> > > > > > >
>> > > > > > > The `TEMPORAL` semantic can be stored in the properties of the
>> > > table
>> > > > > > > when writing to a catalog. We do the same for watermarks and
>> > > computed
>> > > > > > > columns.
>> > > > > > >
>> > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x`
>> would
>> > > only
>> > > > > > > work on processing time by a lookup into the external system
>> or
>> > on
>> > > > > > > event-time by using the time semantics that the external
>> system
>> > > > > supports.
>> > > > > > >
>> > > > > > > Regarding retraction table with a primary key and a
>> > time-attribute:
>> > > > > > >
>> > > > > > > These semantics are still unclear to me. Can retractions only
>> > occur
>> > > > > > > within watermarks? Or are they also used for representing late
>> > > > updates?
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Timo
>> > > > > > >
>> > > > > > >
>> > > > > > > On 17.04.20 14:34, Fabian Hueske wrote:
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > First of all, I appologize for the text wall that's
>> > following...
>> > > > ;-)
>> > > > > > > >
>> > > > > > > > A temporal table join joins an append-only table and a
>> temporal
>> > > > > table.
>> > > > > > > > The question about how to represent a temporal table join
>> boils
>> > > > down
>> > > > > to
>> > > > > > > two
>> > > > > > > > questions:
>> > > > > > > >
>> > > > > > > > 1) How to represent a temporal table
>> > > > > > > > 2) How to specify the join of an append-only table and a
>> > temporal
>> > > > > table
>> > > > > > > >
>> > > > > > > > I'll discuss these points separately.
>> > > > > > > >
>> > > > > > > > # 1 How to represent a temporal table
>> > > > > > > >
>> > > > > > > > A temporal table is a table that can be looked up with a
>> time
>> > > > > parameter
>> > > > > > > and
>> > > > > > > > which returns the rows of the table at that point in time /
>> for
>> > > > that
>> > > > > > > > version.
>> > > > > > > > In order to be able to (conceptually) look up previous
>> > versions,
>> > > a
>> > > > > > > temporal
>> > > > > > > > table must be (conceptually) backed by a history table that
>> > > tracks
>> > > > > all
>> > > > > > > > previous versions (see SqlServer docs [1]).
>> > > > > > > > In the context of our join, we added another restriction
>> namely
>> > > > that
>> > > > > > the
>> > > > > > > > table must have a primary key, i.e., there is only one row
>> for
>> > > each
>> > > > > > > version
>> > > > > > > > for each unique key.
>> > > > > > > >
>> > > > > > > > Hence, the requirements for a temporal table are:
>> > > > > > > > * The temporal table has a primary key / unique attribute
>> > > > > > > > * The temporal table has a time-attribute that defines the
>> > start
>> > > of
>> > > > > the
>> > > > > > > > validity interval of a row (processing time or event time)
>> > > > > > > > * The system knows that the history of the table is tracked
>> and
>> > > can
>> > > > > > infer
>> > > > > > > > how to look up a version.
>> > > > > > > >
>> > > > > > > > There are two possible types of input from which we want to
>> > > create
>> > > > > > > temporal
>> > > > > > > > tables (that I'm aware of):
>> > > > > > > >
>> > > > > > > > * append-only tables, i.e., tables that contain the full
>> change
>> > > > > history
>> > > > > > > > * retraction tables, i.e., tables that are updating and do
>> not
>> > > > > remember
>> > > > > > > the
>> > > > > > > > history.
>> > > > > > > >
>> > > > > > > > There are a few ways to do this:
>> > > > > > > >
>> > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a time
>> > > > attribute.
>> > > > > > > >
>> > > > > > > > The following view definition results in a view that
>> provides
>> > the
>> > > > > > latest
>> > > > > > > > rate for each currency.
>> > > > > > > >
>> > > > > > > > CREATE VIEW rates AS
>> > > > > > > > SELECT
>> > > > > > > >    currency, MAX(rate) as rate, MAX(rowtime) as rowtime
>> > > > > > > > FROM rates_history rh1
>> > > > > > > > WHERE
>> > > > > > > >    rh1.rowtime = (
>> > > > > > > >      SELECT max(rowtime)
>> > > > > > > >      FROM rates_history rh2
>> > > > > > > >      WHERE rh2.curreny = rh1.currency)
>> > > > > > > > GROUP BY currency
>> > > > > > > > WITH (
>> > > > > > > >    'historytracking' = 'true',
>> > > > > > > >    'historytracking.starttime' = 'rowtime');
>> > > > > > > >
>> > > > > > > > However, we also need to tell the system to track the
>> history
>> > of
>> > > > all
>> > > > > > > > changes of the view in order to be able to look it up.
>> > > > > > > > That's what the properties in the WITH clause are for
>> (inspired
>> > > by
>> > > > > > > > SqlServer's TEMPORAL TABLE DDL syntax).
>> > > > > > > > Note that this is *not* a syntax proposal but only meant to
>> > show
>> > > > > which
>> > > > > > > > information is needed.
>> > > > > > > > This view allows to look up any version of the "rates" view.
>> > > > > > > >
>> > > > > > > > In addition to designing and implementing the DDL syntax for
>> > > views
>> > > > > that
>> > > > > > > > support temporal lookups, the optimizer would need to
>> > understand
>> > > > the
>> > > > > > > > semantics of the view definition in depth.
>> > > > > > > > Among other things it needs to understand that the MAX()
>> > > > aggregation
>> > > > > on
>> > > > > > > the
>> > > > > > > > time-attribute preserves its watermark alignment.
>> > > > > > > > AFAIK, this is not the case at the moment (the time
>> attribute
>> > > would
>> > > > > be
>> > > > > > > > converted into a regular TIMESTAMP and lose it's time
>> attribute
>> > > > > > > properties)
>> > > > > > > >
>> > > > > > > > ## 1.2 A retraction table with a primary key and a
>> > > time-attribute.
>> > > > > > > >
>> > > > > > > > On paper it looks like such a table would automatically
>> qualify
>> > > as
>> > > > a
>> > > > > > > > time-versioned table because it completely fulfills the
>> > > > requirements.
>> > > > > > > > However, I don't think we can use it *as is* as a temporal
>> > table
>> > > if
>> > > > > we
>> > > > > > > want
>> > > > > > > > to have clean semantics.
>> > > > > > > > The problem here is the "lost history" of the retraction
>> table.
>> > > The
>> > > > > > > dynamic
>> > > > > > > > table that is defined on the retraction stream only stores
>> the
>> > > > latest
>> > > > > > > > version (even though it sees all versions).
>> > > > > > > > Conceptually, a temporal table look up the version of the
>> table
>> > > at
>> > > > > any
>> > > > > > > > point in time because it is backed by a history table.
>> > > > > > > > If this information is not available, we cannot have a
>> > > semantically
>> > > > > > clean
>> > > > > > > > definition of the join IMO.
>> > > > > > > >
>> > > > > > > > Therefore we should define the table in a way that the
>> system
>> > > knows
>> > > > > > that
>> > > > > > > > the history is tracked.
>> > > > > > > > In MSSQL uses a syntax similar to this one
>> > > > > > > >
>> > > > > > > > CREATE TABLE rates (
>> > > > > > > >      currency CHAR(3) NOT NULL PRIMARY KEY,
>> > > > > > > >      rate DOUBLE,
>> > > > > > > >      rowtime TIMESTAMP,
>> > > > > > > >      WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
>> > > > > > > > WITH (
>> > > > > > > >    'historytracking' = 'true',
>> > > > > > > >    'historytracking.starttime' = 'rowtime');
>> > > > > > > >
>> > > > > > > > The 'historytracking' properties would decare that the table
>> > > tracks
>> > > > > its
>> > > > > > > > history and also specify the attribute (rowtime) that is
>> used
>> > for
>> > > > > > > > versioning.
>> > > > > > > >
>> > > > > > > > ## 1.3 Registering a TableFunction that takes an append-only
>> > > table
>> > > > > with
>> > > > > > > > time attribute
>> > > > > > > >
>> > > > > > > > The TableFunction requires a few parameters:
>> > > > > > > > * the source table from which to derive the temporal table
>> > > > > > > > * the key attribute on which the versions of the source
>> table
>> > > > should
>> > > > > be
>> > > > > > > > computed
>> > > > > > > > * the time attribute that defines the versions
>> > > > > > > > * a lookup timestamp for the version of that is returned.
>> > > > > > > >
>> > > > > > > > The reason why we chose the TableFunction approach over the
>> > VIEW
>> > > > > > approach
>> > > > > > > > so far were:
>> > > > > > > > * It is easier for the optimizer to identify a build-in
>> table
>> > > > > function
>> > > > > > > than
>> > > > > > > > to analyze and reason about a generic VIEW.
>> > > > > > > > * We would need to make the optimizer a lot smarter to infer
>> > all
>> > > > the
>> > > > > > > > properties from the generic VIEW definition that we need
>> for a
>> > > > > temporal
>> > > > > > > > table join.
>> > > > > > > > * Passing a parameter to a function is a known thing,
>> passing a
>> > > > > > parameter
>> > > > > > > > to a VIEW not so much.
>> > > > > > > > * Users would need to specify the VIEW exactly correct, such
>> > that
>> > > > it
>> > > > > > can
>> > > > > > > be
>> > > > > > > > used as a temporal table. Look at 1.1 why this is not
>> trivial.
>> > > > > > > >
>> > > > > > > > There is two ways to use a TableFunction:
>> > > > > > > >
>> > > > > > > > ### 1.3.1 Built-in and pre-registered function that is
>> > > > parameterized
>> > > > > in
>> > > > > > > the
>> > > > > > > > SQL query
>> > > > > > > >
>> > > > > > > > Here, we do not need to do anything to register the
>> function.
>> > We
>> > > > > simply
>> > > > > > > use
>> > > > > > > > it in the query (see example in 2.2 below)
>> > > > > > > >
>> > > > > > > > ### 1.3.2 Parameterize function when it is registered in the
>> > > > catalog
>> > > > > > > (with
>> > > > > > > > a provided Java implementation)
>> > > > > > > >
>> > > > > > > > This is the approach, we've used so far. In the Table API,
>> the
>> > > > > function
>> > > > > > > is
>> > > > > > > > first parameterized and created and then registered:
>> > > > > > > > We would need a DDL syntax to parameterize UDFs on
>> > registration.
>> > > > > > > > I don't want to propose a syntax here, but just to get an
>> idea
>> > it
>> > > > > might
>> > > > > > > > look like this:
>> > > > > > > >
>> > > > > > > > CREATE FUNCTION rates AS
>> > > > > > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH
>> > > ('table' =
>> > > > > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime')
>> > > > > > > >
>> > > > > > > > Right now, the Flink Catalog interface does not have the
>> > > > > functionality
>> > > > > > to
>> > > > > > > > store such parameters and would need some hacks to properly
>> > > create
>> > > > > > > properly
>> > > > > > > > parameterize function instances.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > # 2 Defining a join of an append-only table and a temporal
>> > table
>> > > > > > > >
>> > > > > > > > The append-only table needs to have a time-attribute
>> > (processing
>> > > > time
>> > > > > > or
>> > > > > > > > event time, but same as the temporal table).
>> > > > > > > > The join then needs to specify two things:
>> > > > > > > > * an equality predicate that includes the primary key of the
>> > > > temporal
>> > > > > > > table
>> > > > > > > > * declare the time attribute of the append-only table as the
>> > time
>> > > > as
>> > > > > of
>> > > > > > > > which to look up the temporal table, i.e, get the version of
>> > the
>> > > > > > temporal
>> > > > > > > > table that is valid for the timestamp of the current row
>> from
>> > the
>> > > > > > > > append-only table
>> > > > > > > >
>> > > > > > > > The tricky part (from a syntax point of view) is to specify
>> the
>> > > > > lookup
>> > > > > > > > time.
>> > > > > > > >
>> > > > > > > > ## 2.1 the temporal table is a regular table or view (see
>> > > > approaches
>> > > > > > 1.1
>> > > > > > > > and 1.2 above)
>> > > > > > > >
>> > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x"
>> clause as
>> > > > > > follows:
>> > > > > > > >
>> > > > > > > > SELECT *
>> > > > > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
>> > > > > > > > WHERE o.currency = r.currency
>> > > > > > > >
>> > > > > > > > IMO, this is a great syntax and the one we should strive
>> for.
>> > > > > > > > We would need to bend the rules of the SQL standard which
>> only
>> > > > > allows x
>> > > > > > > in
>> > > > > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on
>> > which
>> > > > it
>> > > > > is
>> > > > > > > > applied usually needs to be a specific type (not sure if
>> views
>> > > are
>> > > > > > > > supported), but I guess this is fine.
>> > > > > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for
>> > > > > > LookupTable
>> > > > > > > > Joins if x is a processing time attribute [2].
>> > > > > > > >
>> > > > > > > > ## 2.2 the temporal table is a TableFunction and
>> parameterized
>> > in
>> > > > the
>> > > > > > > query
>> > > > > > > > (see 1.3.1 above)
>> > > > > > > >
>> > > > > > > > SELECT *
>> > > > > > > > FROM orders o,
>> > > > > > > >    TEMPORAL_TABLE(
>> > > > > > > >      table => TABLE(rates_history),
>> > > > > > > >      key => DESCRIPTOR(currency),
>> > > > > > > >      time => DESCRIPTOR(rowtime)) r
>> > > > > > > >    ON o.currency = r.currency
>> > > > > > > >
>> > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing was
>> > > > registered
>> > > > > in
>> > > > > > > the
>> > > > > > > > catalog (except the rates_history table).
>> > > > > > > > In fact this is valid SQL:2016 syntax and called Polymorphic
>> > > Table
>> > > > > > > > Functions. Have a look here [3].
>> > > > > > > >
>> > > > > > > > ## 2.3 the temporal table is a TableFunction that was
>> > > parameterized
>> > > > > > > during
>> > > > > > > > registration (see 1.3.2 above)
>> > > > > > > >
>> > > > > > > > This is what we have at the momement.
>> > > > > > > >
>> > > > > > > > SELECT *
>> > > > > > > > FROM orders o,
>> > > > > > > >    LATERAL TABLE (rates(o.ordertime))
>> > > > > > > >    ON o.currency = r.currency
>> > > > > > > >
>> > > > > > > > The TableFunction "rates" was registered in the catalog and
>> > > > > > parameterized
>> > > > > > > > to the "rates_history" append-only table, the key was set to
>> > > > > > "currency",
>> > > > > > > > and the time attribute was declared.
>> > > > > > > >
>> > > > > > > > # SUMMARY
>> > > > > > > >
>> > > > > > > > IMO we should in the long run aim to define temporal tables
>> > > either
>> > > > as
>> > > > > > > > upsert retraction tables and views on append-only tables and
>> > join
>> > > > > them
>> > > > > > > > using the "FOR SYSTEM_TIME AS OF x" syntax.
>> > > > > > > > I guess it is debatable whether we need to decare to track
>> > > history
>> > > > > for
>> > > > > > > > these tables (which we don't actually do) or if we do it by
>> > > > > convention
>> > > > > > if
>> > > > > > > > the table has a time attribute.
>> > > > > > > > It should be (relatively) easy to get this to work for
>> > retraction
>> > > > > > tables
>> > > > > > > > which will be supported soon.
>> > > > > > > > It will be more work for views because we need to improve
>> the
>> > > time
>> > > > > > > > attribute handling with MAX() aggregations.
>> > > > > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for
>> > > > > > LookupTableSources
>> > > > > > > > and would "only" need to be adapted to work on temporal
>> tables.
>> > > > > > > >
>> > > > > > > > Registering parameterized TableFunctions in the catalog
>> seems
>> > > like
>> > > > > > quite
>> > > > > > > a
>> > > > > > > > bit of work. We need new DDL syntax, extend the catalog and
>> > > > function
>> > > > > > > > instantiation. This won't be easy, IMO.
>> > > > > > > > If we only support them as TEMPORARY FUNCTION which are not
>> > > > > registered
>> > > > > > in
>> > > > > > > > the catalog it will be easier. The question is whether it is
>> > > worth
>> > > > > the
>> > > > > > > > effort if we decide for the other approach.
>> > > > > > > >
>> > > > > > > > Using TableFunctions that are parameterized in the query
>> will
>> > > > require
>> > > > > > to
>> > > > > > > > extend the Calcite parser and framework to support
>> Polymorphic
>> > > > Table
>> > > > > > > > Functions.
>> > > > > > > > However, there might already some work be done there,
>> because
>> > > AFAIK
>> > > > > > > Apache
>> > > > > > > > Beam aims to support this syntax for windowing functions as
>> > > > described
>> > > > > > in
>> > > > > > > > the "One SQL to rule them all" paper [4].
>> > > > > > > > It might be the fastest and fully SQL standard compliant
>> way.
>> > > > > > > >
>> > > > > > > > Cheers,
>> > > > > > > > Fabian
>> > > > > > > >
>> > > > > > > > [1]
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables
>> > > > > > > > [2]
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1
>> > > > > > > > [3]
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip
>> > > > > > > > [4] https://arxiv.org/abs/1905.12133
>> > > > > > > >
>> > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <
>> > > > imj...@gmail.com
>> > > > > >:
>> > > > > > > >
>> > > > > > > >> Hi Konstantin,
>> > > > > > > >>
>> > > > > > > >> Thanks for bringing this discussion. I think temporal join
>> is
>> > a
>> > > > very
>> > > > > > > >> important feature and should be exposed to pure SQL users.
>> > > > > > > >> And I already received many requirements like this.
>> > > > > > > >> However, my concern is that how to properly support this
>> > feature
>> > > > in
>> > > > > > SQL.
>> > > > > > > >> Introducing a DDL syntax for Temporal Table Function is one
>> > way,
>> > > > but
>> > > > > > > maybe
>> > > > > > > >> not the best one.
>> > > > > > > >>
>> > > > > > > >> The most important reason is that the underlying of
>> temporal
>> > > table
>> > > > > > > function
>> > > > > > > >> is exactly a changelog stream.
>> > > > > > > >> The temporal join is actually temporal joining a fact
>> stream
>> > > with
>> > > > > the
>> > > > > > > >> changelog stream on processing time or event time.
>> > > > > > > >> We will soon support to create a changelog source using DDL
>> > once
>> > > > > > FLIP-95
>> > > > > > > >> and FLIP-105 is finished.
>> > > > > > > >> At that time, we can have a simple DDL to create changelog
>> > > source
>> > > > > like
>> > > > > > > >> this;
>> > > > > > > >>
>> > > > > > > >> CREATE TABLE rate_changelog (
>> > > > > > > >>    currency STRING,
>> > > > > > > >>    rate DECIMAL
>> > > > > > > >> ) WITH (
>> > > > > > > >>    'connector' = 'kafka',
>> > > > > > > >>    'topic' = 'rate_binlog',
>> > > > > > > >>    'properties.bootstrap.servers' = 'localhost:9092',
>> > > > > > > >>    'format' = 'debezium-json'
>> > > > > > > >> );
>> > > > > > > >>
>> > > > > > > >> In the meanwhile, we already have a SQL standard temporal
>> join
>> > > > > syntax
>> > > > > > > [1],
>> > > > > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..".
>> > > > > > > >> It is currently used as dimension table lookup join, but
>> the
>> > > > > semantic
>> > > > > > is
>> > > > > > > >> the same to the "temporal table function join"[2].
>> > > > > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more
>> > > nature
>> > > > > > > >> becuase the definition of B is a *table* not a *table
>> > function*,
>> > > > > > > >> and the syntax is included in SQL standard.
>> > > > > > > >>
>> > > > > > > >> So once we have the ability to define "rate_changelog"
>> table,
>> > > then
>> > > > > we
>> > > > > > > can
>> > > > > > > >> use the following query to temporal join the changelog on
>> > > > processing
>> > > > > > > time.
>> > > > > > > >>
>> > > > > > > >> SELECT *
>> > > > > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF
>> > > > > orders.proctime
>> > > > > > > >> ON orders.currency = rate_changelog.currency;
>> > > > > > > >>
>> > > > > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can
>> > easily
>> > > > to
>> > > > > > > support
>> > > > > > > >> "temporal join on changelogs" without introducing new
>> syntax.
>> > > > > > > >> IMO, introducing a DDL syntax for Temporal Table Function
>> > looks
>> > > > like
>> > > > > > > not an
>> > > > > > > >> easy way and may have repetitive work.
>> > > > > > > >>
>> > > > > > > >> Best,
>> > > > > > > >> Jark
>> > > > > > > >>
>> > > > > > > >> [1]:
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>> > > > > > > >> [2]:
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li <
>> libenc...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > > > >>
>> > > > > > > >>> Hi Konstantin,
>> > > > > > > >>>
>> > > > > > > >>> Thanks for bringing up this discussion. +1 for the idea.
>> > > > > > > >>> We have met this in our company too, and I planned to
>> support
>> > > it
>> > > > > > > recently
>> > > > > > > >>> in our internal branch.
>> > > > > > > >>>
>> > > > > > > >>> regarding to your questions,
>> > > > > > > >>> 1) I think it might be more a table/view than function,
>> just
>> > > like
>> > > > > > > >> Temporal
>> > > > > > > >>> Table (which is also known as
>> > > > > > > >>> dimension table). Maybe we need a DDL like CREATE VIEW and
>> > plus
>> > > > > some
>> > > > > > > >>> additional settings.
>> > > > > > > >>> 2) If we design the DDL for it like view, then maybe
>> > temporary
>> > > is
>> > > > > ok
>> > > > > > > >>> enough.
>> > > > > > > >>>
>> > > > > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四
>> 下午8:16写道:
>> > > > > > > >>>
>> > > > > > > >>>> Hi everyone,
>> > > > > > > >>>>
>> > > > > > > >>>> it would be very useful if temporal tables could be
>> created
>> > > via
>> > > > > > DDL.
>> > > > > > > >>>> Currently, users either need to do this in the Table API
>> or
>> > in
>> > > > the
>> > > > > > > >>>> environment file of the Flink CLI, which both require the
>> > user
>> > > > to
>> > > > > > > >> switch
>> > > > > > > >>>> the context of the SQL CLI/Editor. I recently created a
>> > ticket
>> > > > for
>> > > > > > > this
>> > > > > > > >>>> request [1].
>> > > > > > > >>>>
>> > > > > > > >>>> I see two main questions:
>> > > > > > > >>>>
>> > > > > > > >>>> 1) What would be the DDL syntax? A Temporal Table is on
>> the
>> > > one
>> > > > > > hand a
>> > > > > > > >>> view
>> > > > > > > >>>> and on the other a function depending on how you look at
>> it.
>> > > > > > > >>>>
>> > > > > > > >>>> 2) Would this temporal table view/function be stored in
>> the
>> > > > > catalog
>> > > > > > or
>> > > > > > > >>> only
>> > > > > > > >>>> be temporary?
>> > > > > > > >>>>
>> > > > > > > >>>> I personally do not have much experience in this area of
>> > > Flink,
>> > > > > so I
>> > > > > > > am
>> > > > > > > >>>> looking forward to hearing your thoughts on this.
>> > > > > > > >>>>
>> > > > > > > >>>> Best,
>> > > > > > > >>>>
>> > > > > > > >>>> Konstantin
>> > > > > > > >>>>
>> > > > > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824
>> > > > > > > >>>>
>> > > > > > > >>>> --
>> > > > > > > >>>>
>> > > > > > > >>>> Konstantin Knauf
>> > > > > > > >>>>
>> > > > > > > >>>
>> > > > > > > >>>
>> > > > > > > >>> --
>> > > > > > > >>>
>> > > > > > > >>> Benchao Li
>> > > > > > > >>> School of Electronics Engineering and Computer Science,
>> > Peking
>> > > > > > > University
>> > > > > > > >>> Tel:+86-15650713730
>> > > > > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> > > > > > > >>>
>> > > > > > > >>
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>

Reply via email to