I might missed something but why we need a new "TEMPORAL TABLE" syntax?
According to Fabian's first mail: > Hence, the requirements for a temporal table are: > * The temporal table has a primary key / unique attribute > * The temporal table has a time-attribute that defines the start of the > validity interval of a row (processing time or event time) > * The system knows that the history of the table is tracked and can infer > how to look up a version. I think primary key plus proper event time attribute is already sufficient. So a join query looks like: "Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id = Dim.id" would means for every record belong to Fact, use Fact.some_event_time as Dim's version (which will only keep all records from Dim table with event time less or equal to Fact.some_event_time, and keep only one record for each primary key). The temporal behavior is actually triggered by the join syntax "FOR SYSTEM_TIME AS OF Fact.some_event_time" but not the DDL description. Best, Kurt On Fri, May 8, 2020 at 10:51 AM Jark Wu <imj...@gmail.com> wrote: > Hi, > > I agree what Fabian said above. > Besides, IMO, (3) is in a lower priority and will involve much more things. > It makes sense to me to do it in two-phase. > > Regarding to (3), the key point to convert an append-only table into > changelog table is that the framework should know the operation type, > so we introduced a special CREATE VIEW syntax to do it in the documentation > [1]. Here is an example: > > -- my_binlog table is registered as an append-only table > CREATE TABLE my_binlog ( > before ROW<...>, > after ROW<...>, > op STRING, > op_ms TIMESTAMP(3) > ) WITH ( > 'connector.type' = 'kafka', > ... > ); > > -- interpret my_binlog as a changelog on the op_type and id key > CREATE VIEW my_table AS > SELECT > after.* > FROM my_binlog > CHANGELOG OPERATION BY op > UPDATE KEY BY (id); > > -- my_table will materialize the insert/delete/update changes > -- if we have 4 records in dbz that > -- a create for 1004 > -- an update for 1004 > -- a create for 1005 > -- a delete for 1004 > > SELECT COUNT(*) FROM my_table; > +-----------+ > | COUNT(*) | > +-----------+ > | 1 | > +-----------+ > > Best, > Jark > > [1]: > > https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb > > > On Fri, 8 May 2020 at 00:24, Fabian Hueske <fhue...@gmail.com> wrote: > > > Thanks for the summary Konstantin. > > I think you got all points right. > > > > IMO, the way forward would be to work on a FLIP to define > > * the concept of temporal tables, > > * how to feed them from retraction tables > > * how to feed them from append-only tables > > * their specification with CREATE TEMPORAL TABLE, > > * how to use temporal tables in temporal table joins > > * how (if at all) to use temporal tables in other types of queries > > > > We would keep the LATERAL TABLE syntax because it used for regular > > table-valued functions. > > However, we would probably remove the TemporalTableFunction (which is a > > built-in table-valued function) after we deprecated it for a while. > > > > Cheers, Fabian > > > > Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf < > > kna...@apache.org>: > > > >> Hi everyone, > >> > >> Thanks everyone for joining the discussion on this. Please let me > >> summarize > >> what I have understood so far. > >> > >> 1) For joining an append-only table and a temporal table the syntax the > >> "FOR > >> SYSTEM_TIME AS OF <time-attribute>" seems to be preferred (Fabian, Timo, > >> Seth). > >> > >> 2) To define a temporal table based on a changelog stream from an > external > >> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be > used. > >> 3) In order to also support temporal tables derived from an append-only > >> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian) > >> or > >> need to have a way to convert an append-only table into a changelog > table > >> (briefly discussed in [1]). It is not completely clear to me how a > >> temporal > >> table based on an append-only table would be with the syntax proposed in > >> [1] and 2). @Jark Wu <imj...@gmail.com> could you elaborate a bit on > >> that? > >> > >> How do we move forward with this? > >> > >> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense. > >> What do you think? * If we proceed like this, what would this mean for > the > >> current syntax of LATERAL TABLE? Would we keep it? Would we eventually > >> deprecate and drop it? Since only after 3) we would be on par with the > >> current temporal table function join, I assume, we could only drop it > >> thereafter. > >> > >> Thanks, Konstantin > >> > >> [1] > >> > >> > https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6 > >> > >> > >> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu <imj...@gmail.com> wrote: > >> > >> > Hi Fabian, > >> > > >> > Just to clarify a little bit, we decided to move the "converting > >> > append-only table into changelog table" into future work. > >> > So FLIP-105 only introduced some CDC formats (debezium) and new > >> TableSource > >> > interfaces proposed in FLIP-95. > >> > I should have started a new FLIP for the new CDC formats and keep > >> FLIP-105 > >> > as it is to avoid the confusion, sorry about that. > >> > > >> > Best, > >> > Jark > >> > > >> > > >> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske <fhue...@gmail.com> > wrote: > >> > > >> > > Thanks Jark! > >> > > > >> > > I certainly need to read up on FLIP-105 (and I'll try to adjust my > >> > > terminology to changelog table from now on ;-) ) > >> > > If FLIP-105 addresses the issue of converting an append-only table > >> into a > >> > > changelog table that upserts on primary key (basically what the VIEW > >> > > definition in my first email did), > >> > > TEMPORAL VIEWs become much less important. > >> > > In that case, we would be well served with TEMPORAL TABLE and > TEMPORAL > >> > VIEW > >> > > would be a nice-to-have feature for some later time. > >> > > > >> > > Cheers, Fabian > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu < > imj...@gmail.com > >> >: > >> > > > >> > > > Hi Fabian, > >> > > > > >> > > > I think converting an append-only table into temporal table > contains > >> > two > >> > > > things: > >> > > > (1) converting append-only table into changelog table (or > retraction > >> > > table > >> > > > as you said) > >> > > > (2) define the converted changelog table (maybe is a view now) as > >> > > temporal > >> > > > (or history tracked). > >> > > > > >> > > > The first thing is also mentioned and discussed in FLIP-105 design > >> > draft > >> > > > [1] which proposed a syntax > >> > > > to convert the append-only table into a changelog table. > >> > > > > >> > > > I think TEMPORAL TABLE is quite straightforward and simple, and > can > >> > > satisfy > >> > > > most existing changelog > >> > > > data with popular CDC formats. TEMPORAL VIEW is flexible but will > >> > involve > >> > > > more SQL codes. I think > >> > > > we can support them both. > >> > > > > >> > > > Best, > >> > > > Jark > >> > > > > >> > > > [1]: > >> > > > > >> > > > > >> > > > >> > > >> > https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb > >> > > > > >> > > > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com> > >> wrote: > >> > > > > >> > > > > Hi, > >> > > > > > >> > > > > I agree with most of what Timo said. > >> > > > > > >> > > > > The TEMPORAL keyword (which unfortunately might be easily > confused > >> > with > >> > > > > TEMPORARY...) looks very intuitive and I think using the only > time > >> > > > > attribute for versioning would be a good choice. > >> > > > > > >> > > > > However, TEMPORAL TABLE on retraction tables do not solve the > full > >> > > > problem. > >> > > > > I believe there will be also cases where we need to derive a > >> temporal > >> > > > table > >> > > > > from an append only table (what TemporalTableFunctions do right > >> now). > >> > > > > I think the best choice for this would be TEMPORAL VIEW but as I > >> > > > explained, > >> > > > > it might be a longer way until this can be supported. > >> > > > > TEMPORAL VIEW would also address the problem of preprocessing. > >> > > > > > >> > > > > > Regarding retraction table with a primary key and a > >> time-attribute: > >> > > > > > These semantics are still unclear to me. Can retractions only > >> occur > >> > > > > > within watermarks? Or are they also used for representing late > >> > > updates? > >> > > > > > >> > > > > Time attributes and retraction streams are a challenging topic > >> that I > >> > > > > haven't completely understood yet. > >> > > > > So far we treated time attributes always as part of the data. > >> > > > > In combination with retractions, it seems that they become > >> metadata > >> > > that > >> > > > > specifies when a change was done. > >> > > > > I think this is different from treating time attributes as > regular > >> > > data. > >> > > > > > >> > > > > Cheers, Fabian > >> > > > > > >> > > > > > >> > > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman < > >> > > > > sjwies...@gmail.com > >> > > > > >: > >> > > > > > >> > > > > > I really like the TEMPORAL keyword, I find it very intuitive. > >> > > > > > > >> > > > > > The down side of this approach would be that an additional > >> > > > preprocessing > >> > > > > > > step would not be possible anymore because there is no > >> preceding > >> > > > view. > >> > > > > > > > >> > > > > > > >> > > > > > Yes and no. My understanding is we are not talking about > making > >> > any > >> > > > > > changes to how temporal tables are defined in the table api. > >> Since > >> > > you > >> > > > > > cannot currently define temporal table functions in pure SQL > >> > > > > applications, > >> > > > > > but only pre-register them in YAML, you can't do any > >> pre-processing > >> > > as > >> > > > it > >> > > > > > stands today. Preprocessing may be a generally useful feature, > >> I'm > >> > > not > >> > > > > > sure, but this syntax does not lose us anything in pure SQL > >> > > > applications. > >> > > > > > > >> > > > > > These semantics are still unclear to me. Can retractions only > >> occur > >> > > > > > > within watermarks? Or are they also used for representing > late > >> > > > updates? > >> > > > > > > > >> > > > > > > >> > > > > > I do not know the SQL standard well enough to give a > principled > >> > > > response > >> > > > > to > >> > > > > > this question. However, in my observation of production > >> workloads, > >> > > > users > >> > > > > of > >> > > > > > temporal table functions are doing so to denormalize star > >> schemas > >> > > > before > >> > > > > > performing further transformations and aggregations and expect > >> the > >> > > > output > >> > > > > > to be an append stream. With the ongoing work to better > support > >> > > > > changelogs, > >> > > > > > the need for users to understand the differences in append vs > >> > upsert > >> > > in > >> > > > > > their query may be diminishing but everyone else on this > thread > >> can > >> > > > > better > >> > > > > > speak to that. > >> > > > > > > >> > > > > > Seth > >> > > > > > > >> > > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther < > >> twal...@apache.org> > >> > > > > wrote: > >> > > > > > > >> > > > > > > Hi Fabian, > >> > > > > > > > >> > > > > > > thank you very much for this great summary! > >> > > > > > > > >> > > > > > > I wasn't aware of the Polymorphic Table Functions standard. > >> This > >> > > is a > >> > > > > > > very interesting topic that we should definitely consider in > >> the > >> > > > > future. > >> > > > > > > Maybe this could also help us in defining tables more > >> dynamically > >> > > > > within > >> > > > > > > a query. It could help solving problems as discussed in > >> FLIP-113. > >> > > > > > > > >> > > > > > > Regarding joining: > >> > > > > > > > >> > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of > the > >> > > > current > >> > > > > > > `LATERAL TABLE(rates(x))` syntax. A function that also > behaves > >> > > like a > >> > > > > > > table and needs this special `LATERAL` keyword during > joining > >> is > >> > > not > >> > > > > > > very intuitive. The PTF could be used once they are fully > >> > supported > >> > > > by > >> > > > > > > Calcite and we have the big picture how to also use them for > >> > other > >> > > > > > > time-based operations (windows?, joins?). > >> > > > > > > > >> > > > > > > Regarding how represent a temporal table: > >> > > > > > > > >> > > > > > > I think that our current DDL, current LookupTableSource and > >> > > temporal > >> > > > > > > tables can fit nicely together. > >> > > > > > > > >> > > > > > > How about we simply introduce an additional keyword > >> `TEMPORAL` to > >> > > > > > > indicate history tracking semantics? I think this is the > >> minimal > >> > > > > > > invasive solution: > >> > > > > > > > >> > > > > > > CREATE TEMPORAL TABLE rates ( > >> > > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, > >> > > > > > > rate DOUBLE, > >> > > > > > > rowtime TIMESTAMP, > >> > > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) > >> > > > > > > WITH (...); > >> > > > > > > > >> > > > > > > - The primary key would be defined by the DDL. > >> > > > > > > - The available time attribute would be defined by the DDL. > >> > Either > >> > > as > >> > > > > > > the only time attribute of the table or we introduce a > special > >> > > > > > > constraint similar to `PRIMARY KEY`. > >> > > > > > > > >> > > > > > > The down side of this approach would be that an additional > >> > > > > preprocessing > >> > > > > > > step would not be possible anymore because there is no > >> preceding > >> > > > view. > >> > > > > > > > >> > > > > > > The `TEMPORAL` semantic can be stored in the properties of > the > >> > > table > >> > > > > > > when writing to a catalog. We do the same for watermarks and > >> > > computed > >> > > > > > > columns. > >> > > > > > > > >> > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x` > >> would > >> > > only > >> > > > > > > work on processing time by a lookup into the external system > >> or > >> > on > >> > > > > > > event-time by using the time semantics that the external > >> system > >> > > > > supports. > >> > > > > > > > >> > > > > > > Regarding retraction table with a primary key and a > >> > time-attribute: > >> > > > > > > > >> > > > > > > These semantics are still unclear to me. Can retractions > only > >> > occur > >> > > > > > > within watermarks? Or are they also used for representing > late > >> > > > updates? > >> > > > > > > > >> > > > > > > Regards, > >> > > > > > > Timo > >> > > > > > > > >> > > > > > > > >> > > > > > > On 17.04.20 14:34, Fabian Hueske wrote: > >> > > > > > > > Hi all, > >> > > > > > > > > >> > > > > > > > First of all, I appologize for the text wall that's > >> > following... > >> > > > ;-) > >> > > > > > > > > >> > > > > > > > A temporal table join joins an append-only table and a > >> temporal > >> > > > > table. > >> > > > > > > > The question about how to represent a temporal table join > >> boils > >> > > > down > >> > > > > to > >> > > > > > > two > >> > > > > > > > questions: > >> > > > > > > > > >> > > > > > > > 1) How to represent a temporal table > >> > > > > > > > 2) How to specify the join of an append-only table and a > >> > temporal > >> > > > > table > >> > > > > > > > > >> > > > > > > > I'll discuss these points separately. > >> > > > > > > > > >> > > > > > > > # 1 How to represent a temporal table > >> > > > > > > > > >> > > > > > > > A temporal table is a table that can be looked up with a > >> time > >> > > > > parameter > >> > > > > > > and > >> > > > > > > > which returns the rows of the table at that point in time > / > >> for > >> > > > that > >> > > > > > > > version. > >> > > > > > > > In order to be able to (conceptually) look up previous > >> > versions, > >> > > a > >> > > > > > > temporal > >> > > > > > > > table must be (conceptually) backed by a history table > that > >> > > tracks > >> > > > > all > >> > > > > > > > previous versions (see SqlServer docs [1]). > >> > > > > > > > In the context of our join, we added another restriction > >> namely > >> > > > that > >> > > > > > the > >> > > > > > > > table must have a primary key, i.e., there is only one row > >> for > >> > > each > >> > > > > > > version > >> > > > > > > > for each unique key. > >> > > > > > > > > >> > > > > > > > Hence, the requirements for a temporal table are: > >> > > > > > > > * The temporal table has a primary key / unique attribute > >> > > > > > > > * The temporal table has a time-attribute that defines the > >> > start > >> > > of > >> > > > > the > >> > > > > > > > validity interval of a row (processing time or event time) > >> > > > > > > > * The system knows that the history of the table is > tracked > >> and > >> > > can > >> > > > > > infer > >> > > > > > > > how to look up a version. > >> > > > > > > > > >> > > > > > > > There are two possible types of input from which we want > to > >> > > create > >> > > > > > > temporal > >> > > > > > > > tables (that I'm aware of): > >> > > > > > > > > >> > > > > > > > * append-only tables, i.e., tables that contain the full > >> change > >> > > > > history > >> > > > > > > > * retraction tables, i.e., tables that are updating and do > >> not > >> > > > > remember > >> > > > > > > the > >> > > > > > > > history. > >> > > > > > > > > >> > > > > > > > There are a few ways to do this: > >> > > > > > > > > >> > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a time > >> > > > attribute. > >> > > > > > > > > >> > > > > > > > The following view definition results in a view that > >> provides > >> > the > >> > > > > > latest > >> > > > > > > > rate for each currency. > >> > > > > > > > > >> > > > > > > > CREATE VIEW rates AS > >> > > > > > > > SELECT > >> > > > > > > > currency, MAX(rate) as rate, MAX(rowtime) as rowtime > >> > > > > > > > FROM rates_history rh1 > >> > > > > > > > WHERE > >> > > > > > > > rh1.rowtime = ( > >> > > > > > > > SELECT max(rowtime) > >> > > > > > > > FROM rates_history rh2 > >> > > > > > > > WHERE rh2.curreny = rh1.currency) > >> > > > > > > > GROUP BY currency > >> > > > > > > > WITH ( > >> > > > > > > > 'historytracking' = 'true', > >> > > > > > > > 'historytracking.starttime' = 'rowtime'); > >> > > > > > > > > >> > > > > > > > However, we also need to tell the system to track the > >> history > >> > of > >> > > > all > >> > > > > > > > changes of the view in order to be able to look it up. > >> > > > > > > > That's what the properties in the WITH clause are for > >> (inspired > >> > > by > >> > > > > > > > SqlServer's TEMPORAL TABLE DDL syntax). > >> > > > > > > > Note that this is *not* a syntax proposal but only meant > to > >> > show > >> > > > > which > >> > > > > > > > information is needed. > >> > > > > > > > This view allows to look up any version of the "rates" > view. > >> > > > > > > > > >> > > > > > > > In addition to designing and implementing the DDL syntax > for > >> > > views > >> > > > > that > >> > > > > > > > support temporal lookups, the optimizer would need to > >> > understand > >> > > > the > >> > > > > > > > semantics of the view definition in depth. > >> > > > > > > > Among other things it needs to understand that the MAX() > >> > > > aggregation > >> > > > > on > >> > > > > > > the > >> > > > > > > > time-attribute preserves its watermark alignment. > >> > > > > > > > AFAIK, this is not the case at the moment (the time > >> attribute > >> > > would > >> > > > > be > >> > > > > > > > converted into a regular TIMESTAMP and lose it's time > >> attribute > >> > > > > > > properties) > >> > > > > > > > > >> > > > > > > > ## 1.2 A retraction table with a primary key and a > >> > > time-attribute. > >> > > > > > > > > >> > > > > > > > On paper it looks like such a table would automatically > >> qualify > >> > > as > >> > > > a > >> > > > > > > > time-versioned table because it completely fulfills the > >> > > > requirements. > >> > > > > > > > However, I don't think we can use it *as is* as a temporal > >> > table > >> > > if > >> > > > > we > >> > > > > > > want > >> > > > > > > > to have clean semantics. > >> > > > > > > > The problem here is the "lost history" of the retraction > >> table. > >> > > The > >> > > > > > > dynamic > >> > > > > > > > table that is defined on the retraction stream only stores > >> the > >> > > > latest > >> > > > > > > > version (even though it sees all versions). > >> > > > > > > > Conceptually, a temporal table look up the version of the > >> table > >> > > at > >> > > > > any > >> > > > > > > > point in time because it is backed by a history table. > >> > > > > > > > If this information is not available, we cannot have a > >> > > semantically > >> > > > > > clean > >> > > > > > > > definition of the join IMO. > >> > > > > > > > > >> > > > > > > > Therefore we should define the table in a way that the > >> system > >> > > knows > >> > > > > > that > >> > > > > > > > the history is tracked. > >> > > > > > > > In MSSQL uses a syntax similar to this one > >> > > > > > > > > >> > > > > > > > CREATE TABLE rates ( > >> > > > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, > >> > > > > > > > rate DOUBLE, > >> > > > > > > > rowtime TIMESTAMP, > >> > > > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' > MINUTE) > >> > > > > > > > WITH ( > >> > > > > > > > 'historytracking' = 'true', > >> > > > > > > > 'historytracking.starttime' = 'rowtime'); > >> > > > > > > > > >> > > > > > > > The 'historytracking' properties would decare that the > table > >> > > tracks > >> > > > > its > >> > > > > > > > history and also specify the attribute (rowtime) that is > >> used > >> > for > >> > > > > > > > versioning. > >> > > > > > > > > >> > > > > > > > ## 1.3 Registering a TableFunction that takes an > append-only > >> > > table > >> > > > > with > >> > > > > > > > time attribute > >> > > > > > > > > >> > > > > > > > The TableFunction requires a few parameters: > >> > > > > > > > * the source table from which to derive the temporal table > >> > > > > > > > * the key attribute on which the versions of the source > >> table > >> > > > should > >> > > > > be > >> > > > > > > > computed > >> > > > > > > > * the time attribute that defines the versions > >> > > > > > > > * a lookup timestamp for the version of that is returned. > >> > > > > > > > > >> > > > > > > > The reason why we chose the TableFunction approach over > the > >> > VIEW > >> > > > > > approach > >> > > > > > > > so far were: > >> > > > > > > > * It is easier for the optimizer to identify a build-in > >> table > >> > > > > function > >> > > > > > > than > >> > > > > > > > to analyze and reason about a generic VIEW. > >> > > > > > > > * We would need to make the optimizer a lot smarter to > infer > >> > all > >> > > > the > >> > > > > > > > properties from the generic VIEW definition that we need > >> for a > >> > > > > temporal > >> > > > > > > > table join. > >> > > > > > > > * Passing a parameter to a function is a known thing, > >> passing a > >> > > > > > parameter > >> > > > > > > > to a VIEW not so much. > >> > > > > > > > * Users would need to specify the VIEW exactly correct, > such > >> > that > >> > > > it > >> > > > > > can > >> > > > > > > be > >> > > > > > > > used as a temporal table. Look at 1.1 why this is not > >> trivial. > >> > > > > > > > > >> > > > > > > > There is two ways to use a TableFunction: > >> > > > > > > > > >> > > > > > > > ### 1.3.1 Built-in and pre-registered function that is > >> > > > parameterized > >> > > > > in > >> > > > > > > the > >> > > > > > > > SQL query > >> > > > > > > > > >> > > > > > > > Here, we do not need to do anything to register the > >> function. > >> > We > >> > > > > simply > >> > > > > > > use > >> > > > > > > > it in the query (see example in 2.2 below) > >> > > > > > > > > >> > > > > > > > ### 1.3.2 Parameterize function when it is registered in > the > >> > > > catalog > >> > > > > > > (with > >> > > > > > > > a provided Java implementation) > >> > > > > > > > > >> > > > > > > > This is the approach, we've used so far. In the Table API, > >> the > >> > > > > function > >> > > > > > > is > >> > > > > > > > first parameterized and created and then registered: > >> > > > > > > > We would need a DDL syntax to parameterize UDFs on > >> > registration. > >> > > > > > > > I don't want to propose a syntax here, but just to get an > >> idea > >> > it > >> > > > > might > >> > > > > > > > look like this: > >> > > > > > > > > >> > > > > > > > CREATE FUNCTION rates AS > >> > > > > > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH > >> > > ('table' = > >> > > > > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime') > >> > > > > > > > > >> > > > > > > > Right now, the Flink Catalog interface does not have the > >> > > > > functionality > >> > > > > > to > >> > > > > > > > store such parameters and would need some hacks to > properly > >> > > create > >> > > > > > > properly > >> > > > > > > > parameterize function instances. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > # 2 Defining a join of an append-only table and a temporal > >> > table > >> > > > > > > > > >> > > > > > > > The append-only table needs to have a time-attribute > >> > (processing > >> > > > time > >> > > > > > or > >> > > > > > > > event time, but same as the temporal table). > >> > > > > > > > The join then needs to specify two things: > >> > > > > > > > * an equality predicate that includes the primary key of > the > >> > > > temporal > >> > > > > > > table > >> > > > > > > > * declare the time attribute of the append-only table as > the > >> > time > >> > > > as > >> > > > > of > >> > > > > > > > which to look up the temporal table, i.e, get the version > of > >> > the > >> > > > > > temporal > >> > > > > > > > table that is valid for the timestamp of the current row > >> from > >> > the > >> > > > > > > > append-only table > >> > > > > > > > > >> > > > > > > > The tricky part (from a syntax point of view) is to > specify > >> the > >> > > > > lookup > >> > > > > > > > time. > >> > > > > > > > > >> > > > > > > > ## 2.1 the temporal table is a regular table or view (see > >> > > > approaches > >> > > > > > 1.1 > >> > > > > > > > and 1.2 above) > >> > > > > > > > > >> > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x" > >> clause as > >> > > > > > follows: > >> > > > > > > > > >> > > > > > > > SELECT * > >> > > > > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime > >> > > > > > > > WHERE o.currency = r.currency > >> > > > > > > > > >> > > > > > > > IMO, this is a great syntax and the one we should strive > >> for. > >> > > > > > > > We would need to bend the rules of the SQL standard which > >> only > >> > > > > allows x > >> > > > > > > in > >> > > > > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table > on > >> > which > >> > > > it > >> > > > > is > >> > > > > > > > applied usually needs to be a specific type (not sure if > >> views > >> > > are > >> > > > > > > > supported), but I guess this is fine. > >> > > > > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported > for > >> > > > > > LookupTable > >> > > > > > > > Joins if x is a processing time attribute [2]. > >> > > > > > > > > >> > > > > > > > ## 2.2 the temporal table is a TableFunction and > >> parameterized > >> > in > >> > > > the > >> > > > > > > query > >> > > > > > > > (see 1.3.1 above) > >> > > > > > > > > >> > > > > > > > SELECT * > >> > > > > > > > FROM orders o, > >> > > > > > > > TEMPORAL_TABLE( > >> > > > > > > > table => TABLE(rates_history), > >> > > > > > > > key => DESCRIPTOR(currency), > >> > > > > > > > time => DESCRIPTOR(rowtime)) r > >> > > > > > > > ON o.currency = r.currency > >> > > > > > > > > >> > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing was > >> > > > registered > >> > > > > in > >> > > > > > > the > >> > > > > > > > catalog (except the rates_history table). > >> > > > > > > > In fact this is valid SQL:2016 syntax and called > Polymorphic > >> > > Table > >> > > > > > > > Functions. Have a look here [3]. > >> > > > > > > > > >> > > > > > > > ## 2.3 the temporal table is a TableFunction that was > >> > > parameterized > >> > > > > > > during > >> > > > > > > > registration (see 1.3.2 above) > >> > > > > > > > > >> > > > > > > > This is what we have at the momement. > >> > > > > > > > > >> > > > > > > > SELECT * > >> > > > > > > > FROM orders o, > >> > > > > > > > LATERAL TABLE (rates(o.ordertime)) > >> > > > > > > > ON o.currency = r.currency > >> > > > > > > > > >> > > > > > > > The TableFunction "rates" was registered in the catalog > and > >> > > > > > parameterized > >> > > > > > > > to the "rates_history" append-only table, the key was set > to > >> > > > > > "currency", > >> > > > > > > > and the time attribute was declared. > >> > > > > > > > > >> > > > > > > > # SUMMARY > >> > > > > > > > > >> > > > > > > > IMO we should in the long run aim to define temporal > tables > >> > > either > >> > > > as > >> > > > > > > > upsert retraction tables and views on append-only tables > and > >> > join > >> > > > > them > >> > > > > > > > using the "FOR SYSTEM_TIME AS OF x" syntax. > >> > > > > > > > I guess it is debatable whether we need to decare to track > >> > > history > >> > > > > for > >> > > > > > > > these tables (which we don't actually do) or if we do it > by > >> > > > > convention > >> > > > > > if > >> > > > > > > > the table has a time attribute. > >> > > > > > > > It should be (relatively) easy to get this to work for > >> > retraction > >> > > > > > tables > >> > > > > > > > which will be supported soon. > >> > > > > > > > It will be more work for views because we need to improve > >> the > >> > > time > >> > > > > > > > attribute handling with MAX() aggregations. > >> > > > > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for > >> > > > > > LookupTableSources > >> > > > > > > > and would "only" need to be adapted to work on temporal > >> tables. > >> > > > > > > > > >> > > > > > > > Registering parameterized TableFunctions in the catalog > >> seems > >> > > like > >> > > > > > quite > >> > > > > > > a > >> > > > > > > > bit of work. We need new DDL syntax, extend the catalog > and > >> > > > function > >> > > > > > > > instantiation. This won't be easy, IMO. > >> > > > > > > > If we only support them as TEMPORARY FUNCTION which are > not > >> > > > > registered > >> > > > > > in > >> > > > > > > > the catalog it will be easier. The question is whether it > is > >> > > worth > >> > > > > the > >> > > > > > > > effort if we decide for the other approach. > >> > > > > > > > > >> > > > > > > > Using TableFunctions that are parameterized in the query > >> will > >> > > > require > >> > > > > > to > >> > > > > > > > extend the Calcite parser and framework to support > >> Polymorphic > >> > > > Table > >> > > > > > > > Functions. > >> > > > > > > > However, there might already some work be done there, > >> because > >> > > AFAIK > >> > > > > > > Apache > >> > > > > > > > Beam aims to support this syntax for windowing functions > as > >> > > > described > >> > > > > > in > >> > > > > > > > the "One SQL to rule them all" paper [4]. > >> > > > > > > > It might be the fastest and fully SQL standard compliant > >> way. > >> > > > > > > > > >> > > > > > > > Cheers, > >> > > > > > > > Fabian > >> > > > > > > > > >> > > > > > > > [1] > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables > >> > > > > > > > [2] > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1 > >> > > > > > > > [3] > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip > >> > > > > > > > [4] https://arxiv.org/abs/1905.12133 > >> > > > > > > > > >> > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu < > >> > > > imj...@gmail.com > >> > > > > >: > >> > > > > > > > > >> > > > > > > >> Hi Konstantin, > >> > > > > > > >> > >> > > > > > > >> Thanks for bringing this discussion. I think temporal > join > >> is > >> > a > >> > > > very > >> > > > > > > >> important feature and should be exposed to pure SQL > users. > >> > > > > > > >> And I already received many requirements like this. > >> > > > > > > >> However, my concern is that how to properly support this > >> > feature > >> > > > in > >> > > > > > SQL. > >> > > > > > > >> Introducing a DDL syntax for Temporal Table Function is > one > >> > way, > >> > > > but > >> > > > > > > maybe > >> > > > > > > >> not the best one. > >> > > > > > > >> > >> > > > > > > >> The most important reason is that the underlying of > >> temporal > >> > > table > >> > > > > > > function > >> > > > > > > >> is exactly a changelog stream. > >> > > > > > > >> The temporal join is actually temporal joining a fact > >> stream > >> > > with > >> > > > > the > >> > > > > > > >> changelog stream on processing time or event time. > >> > > > > > > >> We will soon support to create a changelog source using > DDL > >> > once > >> > > > > > FLIP-95 > >> > > > > > > >> and FLIP-105 is finished. > >> > > > > > > >> At that time, we can have a simple DDL to create > changelog > >> > > source > >> > > > > like > >> > > > > > > >> this; > >> > > > > > > >> > >> > > > > > > >> CREATE TABLE rate_changelog ( > >> > > > > > > >> currency STRING, > >> > > > > > > >> rate DECIMAL > >> > > > > > > >> ) WITH ( > >> > > > > > > >> 'connector' = 'kafka', > >> > > > > > > >> 'topic' = 'rate_binlog', > >> > > > > > > >> 'properties.bootstrap.servers' = 'localhost:9092', > >> > > > > > > >> 'format' = 'debezium-json' > >> > > > > > > >> ); > >> > > > > > > >> > >> > > > > > > >> In the meanwhile, we already have a SQL standard temporal > >> join > >> > > > > syntax > >> > > > > > > [1], > >> > > > > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..". > >> > > > > > > >> It is currently used as dimension table lookup join, but > >> the > >> > > > > semantic > >> > > > > > is > >> > > > > > > >> the same to the "temporal table function join"[2]. > >> > > > > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is > more > >> > > nature > >> > > > > > > >> becuase the definition of B is a *table* not a *table > >> > function*, > >> > > > > > > >> and the syntax is included in SQL standard. > >> > > > > > > >> > >> > > > > > > >> So once we have the ability to define "rate_changelog" > >> table, > >> > > then > >> > > > > we > >> > > > > > > can > >> > > > > > > >> use the following query to temporal join the changelog on > >> > > > processing > >> > > > > > > time. > >> > > > > > > >> > >> > > > > > > >> SELECT * > >> > > > > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF > >> > > > > orders.proctime > >> > > > > > > >> ON orders.currency = rate_changelog.currency; > >> > > > > > > >> > >> > > > > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can > >> > easily > >> > > > to > >> > > > > > > support > >> > > > > > > >> "temporal join on changelogs" without introducing new > >> syntax. > >> > > > > > > >> IMO, introducing a DDL syntax for Temporal Table Function > >> > looks > >> > > > like > >> > > > > > > not an > >> > > > > > > >> easy way and may have repetitive work. > >> > > > > > > >> > >> > > > > > > >> Best, > >> > > > > > > >> Jark > >> > > > > > > >> > >> > > > > > > >> [1]: > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > >> > > > > > > >> [2]: > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li < > >> libenc...@gmail.com > >> > > > >> > > > > wrote: > >> > > > > > > >> > >> > > > > > > >>> Hi Konstantin, > >> > > > > > > >>> > >> > > > > > > >>> Thanks for bringing up this discussion. +1 for the idea. > >> > > > > > > >>> We have met this in our company too, and I planned to > >> support > >> > > it > >> > > > > > > recently > >> > > > > > > >>> in our internal branch. > >> > > > > > > >>> > >> > > > > > > >>> regarding to your questions, > >> > > > > > > >>> 1) I think it might be more a table/view than function, > >> just > >> > > like > >> > > > > > > >> Temporal > >> > > > > > > >>> Table (which is also known as > >> > > > > > > >>> dimension table). Maybe we need a DDL like CREATE VIEW > and > >> > plus > >> > > > > some > >> > > > > > > >>> additional settings. > >> > > > > > > >>> 2) If we design the DDL for it like view, then maybe > >> > temporary > >> > > is > >> > > > > ok > >> > > > > > > >>> enough. > >> > > > > > > >>> > >> > > > > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 > >> 下午8:16写道: > >> > > > > > > >>> > >> > > > > > > >>>> Hi everyone, > >> > > > > > > >>>> > >> > > > > > > >>>> it would be very useful if temporal tables could be > >> created > >> > > via > >> > > > > > DDL. > >> > > > > > > >>>> Currently, users either need to do this in the Table > API > >> or > >> > in > >> > > > the > >> > > > > > > >>>> environment file of the Flink CLI, which both require > the > >> > user > >> > > > to > >> > > > > > > >> switch > >> > > > > > > >>>> the context of the SQL CLI/Editor. I recently created a > >> > ticket > >> > > > for > >> > > > > > > this > >> > > > > > > >>>> request [1]. > >> > > > > > > >>>> > >> > > > > > > >>>> I see two main questions: > >> > > > > > > >>>> > >> > > > > > > >>>> 1) What would be the DDL syntax? A Temporal Table is on > >> the > >> > > one > >> > > > > > hand a > >> > > > > > > >>> view > >> > > > > > > >>>> and on the other a function depending on how you look > at > >> it. > >> > > > > > > >>>> > >> > > > > > > >>>> 2) Would this temporal table view/function be stored in > >> the > >> > > > > catalog > >> > > > > > or > >> > > > > > > >>> only > >> > > > > > > >>>> be temporary? > >> > > > > > > >>>> > >> > > > > > > >>>> I personally do not have much experience in this area > of > >> > > Flink, > >> > > > > so I > >> > > > > > > am > >> > > > > > > >>>> looking forward to hearing your thoughts on this. > >> > > > > > > >>>> > >> > > > > > > >>>> Best, > >> > > > > > > >>>> > >> > > > > > > >>>> Konstantin > >> > > > > > > >>>> > >> > > > > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824 > >> > > > > > > >>>> > >> > > > > > > >>>> -- > >> > > > > > > >>>> > >> > > > > > > >>>> Konstantin Knauf > >> > > > > > > >>>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> -- > >> > > > > > > >>> > >> > > > > > > >>> Benchao Li > >> > > > > > > >>> School of Electronics Engineering and Computer Science, > >> > Peking > >> > > > > > > University > >> > > > > > > >>> Tel:+86-15650713730 > >> > > > > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn > >> > > > > > > >>> > >> > > > > > > >> > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> -- > >> > >> Konstantin Knauf > >> > >> https://twitter.com/snntrable > >> > >> https://github.com/knaufk > >> > > >