Hi, I agree what Fabian said above. Besides, IMO, (3) is in a lower priority and will involve much more things. It makes sense to me to do it in two-phase.
Regarding to (3), the key point to convert an append-only table into changelog table is that the framework should know the operation type, so we introduced a special CREATE VIEW syntax to do it in the documentation [1]. Here is an example: -- my_binlog table is registered as an append-only table CREATE TABLE my_binlog ( before ROW<...>, after ROW<...>, op STRING, op_ms TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', ... ); -- interpret my_binlog as a changelog on the op_type and id key CREATE VIEW my_table AS SELECT after.* FROM my_binlog CHANGELOG OPERATION BY op UPDATE KEY BY (id); -- my_table will materialize the insert/delete/update changes -- if we have 4 records in dbz that -- a create for 1004 -- an update for 1004 -- a create for 1005 -- a delete for 1004 > SELECT COUNT(*) FROM my_table; +-----------+ | COUNT(*) | +-----------+ | 1 | +-----------+ Best, Jark [1]: https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb On Fri, 8 May 2020 at 00:24, Fabian Hueske <fhue...@gmail.com> wrote: > Thanks for the summary Konstantin. > I think you got all points right. > > IMO, the way forward would be to work on a FLIP to define > * the concept of temporal tables, > * how to feed them from retraction tables > * how to feed them from append-only tables > * their specification with CREATE TEMPORAL TABLE, > * how to use temporal tables in temporal table joins > * how (if at all) to use temporal tables in other types of queries > > We would keep the LATERAL TABLE syntax because it used for regular > table-valued functions. > However, we would probably remove the TemporalTableFunction (which is a > built-in table-valued function) after we deprecated it for a while. > > Cheers, Fabian > > Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf < > kna...@apache.org>: > >> Hi everyone, >> >> Thanks everyone for joining the discussion on this. Please let me >> summarize >> what I have understood so far. >> >> 1) For joining an append-only table and a temporal table the syntax the >> "FOR >> SYSTEM_TIME AS OF <time-attribute>" seems to be preferred (Fabian, Timo, >> Seth). >> >> 2) To define a temporal table based on a changelog stream from an external >> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be used. >> 3) In order to also support temporal tables derived from an append-only >> stream, we either need to support TEMPORAL VIEW (as mentioned by Fabian) >> or >> need to have a way to convert an append-only table into a changelog table >> (briefly discussed in [1]). It is not completely clear to me how a >> temporal >> table based on an append-only table would be with the syntax proposed in >> [1] and 2). @Jark Wu <imj...@gmail.com> could you elaborate a bit on >> that? >> >> How do we move forward with this? >> >> * It seems that a two-phased approach (1 + 2 now, 3 later) makes sense. >> What do you think? * If we proceed like this, what would this mean for the >> current syntax of LATERAL TABLE? Would we keep it? Would we eventually >> deprecate and drop it? Since only after 3) we would be on par with the >> current temporal table function join, I assume, we could only drop it >> thereafter. >> >> Thanks, Konstantin >> >> [1] >> >> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6 >> >> >> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu <imj...@gmail.com> wrote: >> >> > Hi Fabian, >> > >> > Just to clarify a little bit, we decided to move the "converting >> > append-only table into changelog table" into future work. >> > So FLIP-105 only introduced some CDC formats (debezium) and new >> TableSource >> > interfaces proposed in FLIP-95. >> > I should have started a new FLIP for the new CDC formats and keep >> FLIP-105 >> > as it is to avoid the confusion, sorry about that. >> > >> > Best, >> > Jark >> > >> > >> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske <fhue...@gmail.com> wrote: >> > >> > > Thanks Jark! >> > > >> > > I certainly need to read up on FLIP-105 (and I'll try to adjust my >> > > terminology to changelog table from now on ;-) ) >> > > If FLIP-105 addresses the issue of converting an append-only table >> into a >> > > changelog table that upserts on primary key (basically what the VIEW >> > > definition in my first email did), >> > > TEMPORAL VIEWs become much less important. >> > > In that case, we would be well served with TEMPORAL TABLE and TEMPORAL >> > VIEW >> > > would be a nice-to-have feature for some later time. >> > > >> > > Cheers, Fabian >> > > >> > > >> > > >> > > >> > > >> > > >> > > Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu <imj...@gmail.com >> >: >> > > >> > > > Hi Fabian, >> > > > >> > > > I think converting an append-only table into temporal table contains >> > two >> > > > things: >> > > > (1) converting append-only table into changelog table (or retraction >> > > table >> > > > as you said) >> > > > (2) define the converted changelog table (maybe is a view now) as >> > > temporal >> > > > (or history tracked). >> > > > >> > > > The first thing is also mentioned and discussed in FLIP-105 design >> > draft >> > > > [1] which proposed a syntax >> > > > to convert the append-only table into a changelog table. >> > > > >> > > > I think TEMPORAL TABLE is quite straightforward and simple, and can >> > > satisfy >> > > > most existing changelog >> > > > data with popular CDC formats. TEMPORAL VIEW is flexible but will >> > involve >> > > > more SQL codes. I think >> > > > we can support them both. >> > > > >> > > > Best, >> > > > Jark >> > > > >> > > > [1]: >> > > > >> > > > >> > > >> > >> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb >> > > > >> > > > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com> >> wrote: >> > > > >> > > > > Hi, >> > > > > >> > > > > I agree with most of what Timo said. >> > > > > >> > > > > The TEMPORAL keyword (which unfortunately might be easily confused >> > with >> > > > > TEMPORARY...) looks very intuitive and I think using the only time >> > > > > attribute for versioning would be a good choice. >> > > > > >> > > > > However, TEMPORAL TABLE on retraction tables do not solve the full >> > > > problem. >> > > > > I believe there will be also cases where we need to derive a >> temporal >> > > > table >> > > > > from an append only table (what TemporalTableFunctions do right >> now). >> > > > > I think the best choice for this would be TEMPORAL VIEW but as I >> > > > explained, >> > > > > it might be a longer way until this can be supported. >> > > > > TEMPORAL VIEW would also address the problem of preprocessing. >> > > > > >> > > > > > Regarding retraction table with a primary key and a >> time-attribute: >> > > > > > These semantics are still unclear to me. Can retractions only >> occur >> > > > > > within watermarks? Or are they also used for representing late >> > > updates? >> > > > > >> > > > > Time attributes and retraction streams are a challenging topic >> that I >> > > > > haven't completely understood yet. >> > > > > So far we treated time attributes always as part of the data. >> > > > > In combination with retractions, it seems that they become >> metadata >> > > that >> > > > > specifies when a change was done. >> > > > > I think this is different from treating time attributes as regular >> > > data. >> > > > > >> > > > > Cheers, Fabian >> > > > > >> > > > > >> > > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman < >> > > > > sjwies...@gmail.com >> > > > > >: >> > > > > >> > > > > > I really like the TEMPORAL keyword, I find it very intuitive. >> > > > > > >> > > > > > The down side of this approach would be that an additional >> > > > preprocessing >> > > > > > > step would not be possible anymore because there is no >> preceding >> > > > view. >> > > > > > > >> > > > > > >> > > > > > Yes and no. My understanding is we are not talking about making >> > any >> > > > > > changes to how temporal tables are defined in the table api. >> Since >> > > you >> > > > > > cannot currently define temporal table functions in pure SQL >> > > > > applications, >> > > > > > but only pre-register them in YAML, you can't do any >> pre-processing >> > > as >> > > > it >> > > > > > stands today. Preprocessing may be a generally useful feature, >> I'm >> > > not >> > > > > > sure, but this syntax does not lose us anything in pure SQL >> > > > applications. >> > > > > > >> > > > > > These semantics are still unclear to me. Can retractions only >> occur >> > > > > > > within watermarks? Or are they also used for representing late >> > > > updates? >> > > > > > > >> > > > > > >> > > > > > I do not know the SQL standard well enough to give a principled >> > > > response >> > > > > to >> > > > > > this question. However, in my observation of production >> workloads, >> > > > users >> > > > > of >> > > > > > temporal table functions are doing so to denormalize star >> schemas >> > > > before >> > > > > > performing further transformations and aggregations and expect >> the >> > > > output >> > > > > > to be an append stream. With the ongoing work to better support >> > > > > changelogs, >> > > > > > the need for users to understand the differences in append vs >> > upsert >> > > in >> > > > > > their query may be diminishing but everyone else on this thread >> can >> > > > > better >> > > > > > speak to that. >> > > > > > >> > > > > > Seth >> > > > > > >> > > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther < >> twal...@apache.org> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Fabian, >> > > > > > > >> > > > > > > thank you very much for this great summary! >> > > > > > > >> > > > > > > I wasn't aware of the Polymorphic Table Functions standard. >> This >> > > is a >> > > > > > > very interesting topic that we should definitely consider in >> the >> > > > > future. >> > > > > > > Maybe this could also help us in defining tables more >> dynamically >> > > > > within >> > > > > > > a query. It could help solving problems as discussed in >> FLIP-113. >> > > > > > > >> > > > > > > Regarding joining: >> > > > > > > >> > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of the >> > > > current >> > > > > > > `LATERAL TABLE(rates(x))` syntax. A function that also behaves >> > > like a >> > > > > > > table and needs this special `LATERAL` keyword during joining >> is >> > > not >> > > > > > > very intuitive. The PTF could be used once they are fully >> > supported >> > > > by >> > > > > > > Calcite and we have the big picture how to also use them for >> > other >> > > > > > > time-based operations (windows?, joins?). >> > > > > > > >> > > > > > > Regarding how represent a temporal table: >> > > > > > > >> > > > > > > I think that our current DDL, current LookupTableSource and >> > > temporal >> > > > > > > tables can fit nicely together. >> > > > > > > >> > > > > > > How about we simply introduce an additional keyword >> `TEMPORAL` to >> > > > > > > indicate history tracking semantics? I think this is the >> minimal >> > > > > > > invasive solution: >> > > > > > > >> > > > > > > CREATE TEMPORAL TABLE rates ( >> > > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, >> > > > > > > rate DOUBLE, >> > > > > > > rowtime TIMESTAMP, >> > > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) >> > > > > > > WITH (...); >> > > > > > > >> > > > > > > - The primary key would be defined by the DDL. >> > > > > > > - The available time attribute would be defined by the DDL. >> > Either >> > > as >> > > > > > > the only time attribute of the table or we introduce a special >> > > > > > > constraint similar to `PRIMARY KEY`. >> > > > > > > >> > > > > > > The down side of this approach would be that an additional >> > > > > preprocessing >> > > > > > > step would not be possible anymore because there is no >> preceding >> > > > view. >> > > > > > > >> > > > > > > The `TEMPORAL` semantic can be stored in the properties of the >> > > table >> > > > > > > when writing to a catalog. We do the same for watermarks and >> > > computed >> > > > > > > columns. >> > > > > > > >> > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x` >> would >> > > only >> > > > > > > work on processing time by a lookup into the external system >> or >> > on >> > > > > > > event-time by using the time semantics that the external >> system >> > > > > supports. >> > > > > > > >> > > > > > > Regarding retraction table with a primary key and a >> > time-attribute: >> > > > > > > >> > > > > > > These semantics are still unclear to me. Can retractions only >> > occur >> > > > > > > within watermarks? Or are they also used for representing late >> > > > updates? >> > > > > > > >> > > > > > > Regards, >> > > > > > > Timo >> > > > > > > >> > > > > > > >> > > > > > > On 17.04.20 14:34, Fabian Hueske wrote: >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > First of all, I appologize for the text wall that's >> > following... >> > > > ;-) >> > > > > > > > >> > > > > > > > A temporal table join joins an append-only table and a >> temporal >> > > > > table. >> > > > > > > > The question about how to represent a temporal table join >> boils >> > > > down >> > > > > to >> > > > > > > two >> > > > > > > > questions: >> > > > > > > > >> > > > > > > > 1) How to represent a temporal table >> > > > > > > > 2) How to specify the join of an append-only table and a >> > temporal >> > > > > table >> > > > > > > > >> > > > > > > > I'll discuss these points separately. >> > > > > > > > >> > > > > > > > # 1 How to represent a temporal table >> > > > > > > > >> > > > > > > > A temporal table is a table that can be looked up with a >> time >> > > > > parameter >> > > > > > > and >> > > > > > > > which returns the rows of the table at that point in time / >> for >> > > > that >> > > > > > > > version. >> > > > > > > > In order to be able to (conceptually) look up previous >> > versions, >> > > a >> > > > > > > temporal >> > > > > > > > table must be (conceptually) backed by a history table that >> > > tracks >> > > > > all >> > > > > > > > previous versions (see SqlServer docs [1]). >> > > > > > > > In the context of our join, we added another restriction >> namely >> > > > that >> > > > > > the >> > > > > > > > table must have a primary key, i.e., there is only one row >> for >> > > each >> > > > > > > version >> > > > > > > > for each unique key. >> > > > > > > > >> > > > > > > > Hence, the requirements for a temporal table are: >> > > > > > > > * The temporal table has a primary key / unique attribute >> > > > > > > > * The temporal table has a time-attribute that defines the >> > start >> > > of >> > > > > the >> > > > > > > > validity interval of a row (processing time or event time) >> > > > > > > > * The system knows that the history of the table is tracked >> and >> > > can >> > > > > > infer >> > > > > > > > how to look up a version. >> > > > > > > > >> > > > > > > > There are two possible types of input from which we want to >> > > create >> > > > > > > temporal >> > > > > > > > tables (that I'm aware of): >> > > > > > > > >> > > > > > > > * append-only tables, i.e., tables that contain the full >> change >> > > > > history >> > > > > > > > * retraction tables, i.e., tables that are updating and do >> not >> > > > > remember >> > > > > > > the >> > > > > > > > history. >> > > > > > > > >> > > > > > > > There are a few ways to do this: >> > > > > > > > >> > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a time >> > > > attribute. >> > > > > > > > >> > > > > > > > The following view definition results in a view that >> provides >> > the >> > > > > > latest >> > > > > > > > rate for each currency. >> > > > > > > > >> > > > > > > > CREATE VIEW rates AS >> > > > > > > > SELECT >> > > > > > > > currency, MAX(rate) as rate, MAX(rowtime) as rowtime >> > > > > > > > FROM rates_history rh1 >> > > > > > > > WHERE >> > > > > > > > rh1.rowtime = ( >> > > > > > > > SELECT max(rowtime) >> > > > > > > > FROM rates_history rh2 >> > > > > > > > WHERE rh2.curreny = rh1.currency) >> > > > > > > > GROUP BY currency >> > > > > > > > WITH ( >> > > > > > > > 'historytracking' = 'true', >> > > > > > > > 'historytracking.starttime' = 'rowtime'); >> > > > > > > > >> > > > > > > > However, we also need to tell the system to track the >> history >> > of >> > > > all >> > > > > > > > changes of the view in order to be able to look it up. >> > > > > > > > That's what the properties in the WITH clause are for >> (inspired >> > > by >> > > > > > > > SqlServer's TEMPORAL TABLE DDL syntax). >> > > > > > > > Note that this is *not* a syntax proposal but only meant to >> > show >> > > > > which >> > > > > > > > information is needed. >> > > > > > > > This view allows to look up any version of the "rates" view. >> > > > > > > > >> > > > > > > > In addition to designing and implementing the DDL syntax for >> > > views >> > > > > that >> > > > > > > > support temporal lookups, the optimizer would need to >> > understand >> > > > the >> > > > > > > > semantics of the view definition in depth. >> > > > > > > > Among other things it needs to understand that the MAX() >> > > > aggregation >> > > > > on >> > > > > > > the >> > > > > > > > time-attribute preserves its watermark alignment. >> > > > > > > > AFAIK, this is not the case at the moment (the time >> attribute >> > > would >> > > > > be >> > > > > > > > converted into a regular TIMESTAMP and lose it's time >> attribute >> > > > > > > properties) >> > > > > > > > >> > > > > > > > ## 1.2 A retraction table with a primary key and a >> > > time-attribute. >> > > > > > > > >> > > > > > > > On paper it looks like such a table would automatically >> qualify >> > > as >> > > > a >> > > > > > > > time-versioned table because it completely fulfills the >> > > > requirements. >> > > > > > > > However, I don't think we can use it *as is* as a temporal >> > table >> > > if >> > > > > we >> > > > > > > want >> > > > > > > > to have clean semantics. >> > > > > > > > The problem here is the "lost history" of the retraction >> table. >> > > The >> > > > > > > dynamic >> > > > > > > > table that is defined on the retraction stream only stores >> the >> > > > latest >> > > > > > > > version (even though it sees all versions). >> > > > > > > > Conceptually, a temporal table look up the version of the >> table >> > > at >> > > > > any >> > > > > > > > point in time because it is backed by a history table. >> > > > > > > > If this information is not available, we cannot have a >> > > semantically >> > > > > > clean >> > > > > > > > definition of the join IMO. >> > > > > > > > >> > > > > > > > Therefore we should define the table in a way that the >> system >> > > knows >> > > > > > that >> > > > > > > > the history is tracked. >> > > > > > > > In MSSQL uses a syntax similar to this one >> > > > > > > > >> > > > > > > > CREATE TABLE rates ( >> > > > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, >> > > > > > > > rate DOUBLE, >> > > > > > > > rowtime TIMESTAMP, >> > > > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) >> > > > > > > > WITH ( >> > > > > > > > 'historytracking' = 'true', >> > > > > > > > 'historytracking.starttime' = 'rowtime'); >> > > > > > > > >> > > > > > > > The 'historytracking' properties would decare that the table >> > > tracks >> > > > > its >> > > > > > > > history and also specify the attribute (rowtime) that is >> used >> > for >> > > > > > > > versioning. >> > > > > > > > >> > > > > > > > ## 1.3 Registering a TableFunction that takes an append-only >> > > table >> > > > > with >> > > > > > > > time attribute >> > > > > > > > >> > > > > > > > The TableFunction requires a few parameters: >> > > > > > > > * the source table from which to derive the temporal table >> > > > > > > > * the key attribute on which the versions of the source >> table >> > > > should >> > > > > be >> > > > > > > > computed >> > > > > > > > * the time attribute that defines the versions >> > > > > > > > * a lookup timestamp for the version of that is returned. >> > > > > > > > >> > > > > > > > The reason why we chose the TableFunction approach over the >> > VIEW >> > > > > > approach >> > > > > > > > so far were: >> > > > > > > > * It is easier for the optimizer to identify a build-in >> table >> > > > > function >> > > > > > > than >> > > > > > > > to analyze and reason about a generic VIEW. >> > > > > > > > * We would need to make the optimizer a lot smarter to infer >> > all >> > > > the >> > > > > > > > properties from the generic VIEW definition that we need >> for a >> > > > > temporal >> > > > > > > > table join. >> > > > > > > > * Passing a parameter to a function is a known thing, >> passing a >> > > > > > parameter >> > > > > > > > to a VIEW not so much. >> > > > > > > > * Users would need to specify the VIEW exactly correct, such >> > that >> > > > it >> > > > > > can >> > > > > > > be >> > > > > > > > used as a temporal table. Look at 1.1 why this is not >> trivial. >> > > > > > > > >> > > > > > > > There is two ways to use a TableFunction: >> > > > > > > > >> > > > > > > > ### 1.3.1 Built-in and pre-registered function that is >> > > > parameterized >> > > > > in >> > > > > > > the >> > > > > > > > SQL query >> > > > > > > > >> > > > > > > > Here, we do not need to do anything to register the >> function. >> > We >> > > > > simply >> > > > > > > use >> > > > > > > > it in the query (see example in 2.2 below) >> > > > > > > > >> > > > > > > > ### 1.3.2 Parameterize function when it is registered in the >> > > > catalog >> > > > > > > (with >> > > > > > > > a provided Java implementation) >> > > > > > > > >> > > > > > > > This is the approach, we've used so far. In the Table API, >> the >> > > > > function >> > > > > > > is >> > > > > > > > first parameterized and created and then registered: >> > > > > > > > We would need a DDL syntax to parameterize UDFs on >> > registration. >> > > > > > > > I don't want to propose a syntax here, but just to get an >> idea >> > it >> > > > > might >> > > > > > > > look like this: >> > > > > > > > >> > > > > > > > CREATE FUNCTION rates AS >> > > > > > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH >> > > ('table' = >> > > > > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime') >> > > > > > > > >> > > > > > > > Right now, the Flink Catalog interface does not have the >> > > > > functionality >> > > > > > to >> > > > > > > > store such parameters and would need some hacks to properly >> > > create >> > > > > > > properly >> > > > > > > > parameterize function instances. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > # 2 Defining a join of an append-only table and a temporal >> > table >> > > > > > > > >> > > > > > > > The append-only table needs to have a time-attribute >> > (processing >> > > > time >> > > > > > or >> > > > > > > > event time, but same as the temporal table). >> > > > > > > > The join then needs to specify two things: >> > > > > > > > * an equality predicate that includes the primary key of the >> > > > temporal >> > > > > > > table >> > > > > > > > * declare the time attribute of the append-only table as the >> > time >> > > > as >> > > > > of >> > > > > > > > which to look up the temporal table, i.e, get the version of >> > the >> > > > > > temporal >> > > > > > > > table that is valid for the timestamp of the current row >> from >> > the >> > > > > > > > append-only table >> > > > > > > > >> > > > > > > > The tricky part (from a syntax point of view) is to specify >> the >> > > > > lookup >> > > > > > > > time. >> > > > > > > > >> > > > > > > > ## 2.1 the temporal table is a regular table or view (see >> > > > approaches >> > > > > > 1.1 >> > > > > > > > and 1.2 above) >> > > > > > > > >> > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x" >> clause as >> > > > > > follows: >> > > > > > > > >> > > > > > > > SELECT * >> > > > > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime >> > > > > > > > WHERE o.currency = r.currency >> > > > > > > > >> > > > > > > > IMO, this is a great syntax and the one we should strive >> for. >> > > > > > > > We would need to bend the rules of the SQL standard which >> only >> > > > > allows x >> > > > > > > in >> > > > > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on >> > which >> > > > it >> > > > > is >> > > > > > > > applied usually needs to be a specific type (not sure if >> views >> > > are >> > > > > > > > supported), but I guess this is fine. >> > > > > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for >> > > > > > LookupTable >> > > > > > > > Joins if x is a processing time attribute [2]. >> > > > > > > > >> > > > > > > > ## 2.2 the temporal table is a TableFunction and >> parameterized >> > in >> > > > the >> > > > > > > query >> > > > > > > > (see 1.3.1 above) >> > > > > > > > >> > > > > > > > SELECT * >> > > > > > > > FROM orders o, >> > > > > > > > TEMPORAL_TABLE( >> > > > > > > > table => TABLE(rates_history), >> > > > > > > > key => DESCRIPTOR(currency), >> > > > > > > > time => DESCRIPTOR(rowtime)) r >> > > > > > > > ON o.currency = r.currency >> > > > > > > > >> > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing was >> > > > registered >> > > > > in >> > > > > > > the >> > > > > > > > catalog (except the rates_history table). >> > > > > > > > In fact this is valid SQL:2016 syntax and called Polymorphic >> > > Table >> > > > > > > > Functions. Have a look here [3]. >> > > > > > > > >> > > > > > > > ## 2.3 the temporal table is a TableFunction that was >> > > parameterized >> > > > > > > during >> > > > > > > > registration (see 1.3.2 above) >> > > > > > > > >> > > > > > > > This is what we have at the momement. >> > > > > > > > >> > > > > > > > SELECT * >> > > > > > > > FROM orders o, >> > > > > > > > LATERAL TABLE (rates(o.ordertime)) >> > > > > > > > ON o.currency = r.currency >> > > > > > > > >> > > > > > > > The TableFunction "rates" was registered in the catalog and >> > > > > > parameterized >> > > > > > > > to the "rates_history" append-only table, the key was set to >> > > > > > "currency", >> > > > > > > > and the time attribute was declared. >> > > > > > > > >> > > > > > > > # SUMMARY >> > > > > > > > >> > > > > > > > IMO we should in the long run aim to define temporal tables >> > > either >> > > > as >> > > > > > > > upsert retraction tables and views on append-only tables and >> > join >> > > > > them >> > > > > > > > using the "FOR SYSTEM_TIME AS OF x" syntax. >> > > > > > > > I guess it is debatable whether we need to decare to track >> > > history >> > > > > for >> > > > > > > > these tables (which we don't actually do) or if we do it by >> > > > > convention >> > > > > > if >> > > > > > > > the table has a time attribute. >> > > > > > > > It should be (relatively) easy to get this to work for >> > retraction >> > > > > > tables >> > > > > > > > which will be supported soon. >> > > > > > > > It will be more work for views because we need to improve >> the >> > > time >> > > > > > > > attribute handling with MAX() aggregations. >> > > > > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for >> > > > > > LookupTableSources >> > > > > > > > and would "only" need to be adapted to work on temporal >> tables. >> > > > > > > > >> > > > > > > > Registering parameterized TableFunctions in the catalog >> seems >> > > like >> > > > > > quite >> > > > > > > a >> > > > > > > > bit of work. We need new DDL syntax, extend the catalog and >> > > > function >> > > > > > > > instantiation. This won't be easy, IMO. >> > > > > > > > If we only support them as TEMPORARY FUNCTION which are not >> > > > > registered >> > > > > > in >> > > > > > > > the catalog it will be easier. The question is whether it is >> > > worth >> > > > > the >> > > > > > > > effort if we decide for the other approach. >> > > > > > > > >> > > > > > > > Using TableFunctions that are parameterized in the query >> will >> > > > require >> > > > > > to >> > > > > > > > extend the Calcite parser and framework to support >> Polymorphic >> > > > Table >> > > > > > > > Functions. >> > > > > > > > However, there might already some work be done there, >> because >> > > AFAIK >> > > > > > > Apache >> > > > > > > > Beam aims to support this syntax for windowing functions as >> > > > described >> > > > > > in >> > > > > > > > the "One SQL to rule them all" paper [4]. >> > > > > > > > It might be the fastest and fully SQL standard compliant >> way. >> > > > > > > > >> > > > > > > > Cheers, >> > > > > > > > Fabian >> > > > > > > > >> > > > > > > > [1] >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables >> > > > > > > > [2] >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1 >> > > > > > > > [3] >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip >> > > > > > > > [4] https://arxiv.org/abs/1905.12133 >> > > > > > > > >> > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu < >> > > > imj...@gmail.com >> > > > > >: >> > > > > > > > >> > > > > > > >> Hi Konstantin, >> > > > > > > >> >> > > > > > > >> Thanks for bringing this discussion. I think temporal join >> is >> > a >> > > > very >> > > > > > > >> important feature and should be exposed to pure SQL users. >> > > > > > > >> And I already received many requirements like this. >> > > > > > > >> However, my concern is that how to properly support this >> > feature >> > > > in >> > > > > > SQL. >> > > > > > > >> Introducing a DDL syntax for Temporal Table Function is one >> > way, >> > > > but >> > > > > > > maybe >> > > > > > > >> not the best one. >> > > > > > > >> >> > > > > > > >> The most important reason is that the underlying of >> temporal >> > > table >> > > > > > > function >> > > > > > > >> is exactly a changelog stream. >> > > > > > > >> The temporal join is actually temporal joining a fact >> stream >> > > with >> > > > > the >> > > > > > > >> changelog stream on processing time or event time. >> > > > > > > >> We will soon support to create a changelog source using DDL >> > once >> > > > > > FLIP-95 >> > > > > > > >> and FLIP-105 is finished. >> > > > > > > >> At that time, we can have a simple DDL to create changelog >> > > source >> > > > > like >> > > > > > > >> this; >> > > > > > > >> >> > > > > > > >> CREATE TABLE rate_changelog ( >> > > > > > > >> currency STRING, >> > > > > > > >> rate DECIMAL >> > > > > > > >> ) WITH ( >> > > > > > > >> 'connector' = 'kafka', >> > > > > > > >> 'topic' = 'rate_binlog', >> > > > > > > >> 'properties.bootstrap.servers' = 'localhost:9092', >> > > > > > > >> 'format' = 'debezium-json' >> > > > > > > >> ); >> > > > > > > >> >> > > > > > > >> In the meanwhile, we already have a SQL standard temporal >> join >> > > > > syntax >> > > > > > > [1], >> > > > > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..". >> > > > > > > >> It is currently used as dimension table lookup join, but >> the >> > > > > semantic >> > > > > > is >> > > > > > > >> the same to the "temporal table function join"[2]. >> > > > > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more >> > > nature >> > > > > > > >> becuase the definition of B is a *table* not a *table >> > function*, >> > > > > > > >> and the syntax is included in SQL standard. >> > > > > > > >> >> > > > > > > >> So once we have the ability to define "rate_changelog" >> table, >> > > then >> > > > > we >> > > > > > > can >> > > > > > > >> use the following query to temporal join the changelog on >> > > > processing >> > > > > > > time. >> > > > > > > >> >> > > > > > > >> SELECT * >> > > > > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF >> > > > > orders.proctime >> > > > > > > >> ON orders.currency = rate_changelog.currency; >> > > > > > > >> >> > > > > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can >> > easily >> > > > to >> > > > > > > support >> > > > > > > >> "temporal join on changelogs" without introducing new >> syntax. >> > > > > > > >> IMO, introducing a DDL syntax for Temporal Table Function >> > looks >> > > > like >> > > > > > > not an >> > > > > > > >> easy way and may have repetitive work. >> > > > > > > >> >> > > > > > > >> Best, >> > > > > > > >> Jark >> > > > > > > >> >> > > > > > > >> [1]: >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >> > > > > > > >> [2]: >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li < >> libenc...@gmail.com >> > > >> > > > > wrote: >> > > > > > > >> >> > > > > > > >>> Hi Konstantin, >> > > > > > > >>> >> > > > > > > >>> Thanks for bringing up this discussion. +1 for the idea. >> > > > > > > >>> We have met this in our company too, and I planned to >> support >> > > it >> > > > > > > recently >> > > > > > > >>> in our internal branch. >> > > > > > > >>> >> > > > > > > >>> regarding to your questions, >> > > > > > > >>> 1) I think it might be more a table/view than function, >> just >> > > like >> > > > > > > >> Temporal >> > > > > > > >>> Table (which is also known as >> > > > > > > >>> dimension table). Maybe we need a DDL like CREATE VIEW and >> > plus >> > > > > some >> > > > > > > >>> additional settings. >> > > > > > > >>> 2) If we design the DDL for it like view, then maybe >> > temporary >> > > is >> > > > > ok >> > > > > > > >>> enough. >> > > > > > > >>> >> > > > > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 >> 下午8:16写道: >> > > > > > > >>> >> > > > > > > >>>> Hi everyone, >> > > > > > > >>>> >> > > > > > > >>>> it would be very useful if temporal tables could be >> created >> > > via >> > > > > > DDL. >> > > > > > > >>>> Currently, users either need to do this in the Table API >> or >> > in >> > > > the >> > > > > > > >>>> environment file of the Flink CLI, which both require the >> > user >> > > > to >> > > > > > > >> switch >> > > > > > > >>>> the context of the SQL CLI/Editor. I recently created a >> > ticket >> > > > for >> > > > > > > this >> > > > > > > >>>> request [1]. >> > > > > > > >>>> >> > > > > > > >>>> I see two main questions: >> > > > > > > >>>> >> > > > > > > >>>> 1) What would be the DDL syntax? A Temporal Table is on >> the >> > > one >> > > > > > hand a >> > > > > > > >>> view >> > > > > > > >>>> and on the other a function depending on how you look at >> it. >> > > > > > > >>>> >> > > > > > > >>>> 2) Would this temporal table view/function be stored in >> the >> > > > > catalog >> > > > > > or >> > > > > > > >>> only >> > > > > > > >>>> be temporary? >> > > > > > > >>>> >> > > > > > > >>>> I personally do not have much experience in this area of >> > > Flink, >> > > > > so I >> > > > > > > am >> > > > > > > >>>> looking forward to hearing your thoughts on this. >> > > > > > > >>>> >> > > > > > > >>>> Best, >> > > > > > > >>>> >> > > > > > > >>>> Konstantin >> > > > > > > >>>> >> > > > > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824 >> > > > > > > >>>> >> > > > > > > >>>> -- >> > > > > > > >>>> >> > > > > > > >>>> Konstantin Knauf >> > > > > > > >>>> >> > > > > > > >>> >> > > > > > > >>> >> > > > > > > >>> -- >> > > > > > > >>> >> > > > > > > >>> Benchao Li >> > > > > > > >>> School of Electronics Engineering and Computer Science, >> > Peking >> > > > > > > University >> > > > > > > >>> Tel:+86-15650713730 >> > > > > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> > > > > > > >>> >> > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> -- >> >> Konstantin Knauf >> >> https://twitter.com/snntrable >> >> https://github.com/knaufk >> >