Hi everyone, Every table with a primary key and an event-time attribute provides what is needed for an event-time temporal table join. I agree that, from a technical point of view, the TEMPORAL keyword is not required.
I'm more sceptical about implicitly deriving the versioning information of a (temporal) table as the table's only event-time attribute. In the query SELECT * FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime WHERE o.currency = r.currency the syntax of the temporal table join does not explicitly reference the version of the temporal rates table. Hence, the system needs a way to derive the version of temporal table. Implicitly using the (only) event-time attribute of a temporal table (rates in the example above) to identify the right version works in most cases, but probably not in all. * What if a table has more than one event-time attribute? (TableSchema is designed to support multiple watermarks; queries with interval joins produce tables with multiple event-time attributes, ...) * What if the table does not have an event-time attribute in its schema but the version should only be provided as meta data? We could add a clause to define the version of a table, such as: CREATE TABLE rates ( currency CHAR(3) NOT NULL PRIMARY KEY, rate DOUBLE, rowtime TIMESTAMP, WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE), VERSION (rowtime) WITH (...); The presence of a the VERSION clause (or whatever syntax) would explicitly define the version of a (temporal) table. It would also render the need for the TEMPORAL keyword superfluous because there would be another indicator that a table can be used in a temporal table join. I'm OK with not adding the TEMPORAL keyword, but I recommend that we think again about the proposed implicit definition of a table's version and how it might limit use in the future. Cheers, Fabian Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <imj...@gmail.com>: > I'm also +1 for not adding the TEMPORAL keyword. > > +1 to make the PRIMARY KEY semantic clear for sources. > From my point of view: > > 1) PRIMARY KEY on changelog souruce: > It means that when the changelogs (INSERT/UPDATE/DELETE) are materialized, > the materialized table should be unique on the primary key columns. > Flink assumes messages are in order on the primary key. Flink doesn't > validate/enforces the key integrity, but simply trust it (thus NOT > ENFORCED). > Flink will use the PRIMARY KEY for some optimization, e.g. use the PRIMARY > KEY to update the materilized state by key in temporal join operator. > > 2) PRIMARY KEY on insert-only source: > I prefer to have the same semantic to the batch source and changelog > source, that it implies that records are not duplicate on the primary key. > Flink just simply trust the primary key constraint, and doesn't valid it. > If there is duplicate primary keys with INSERT changeflag, then result of > Flink query might be wrong. > > If this is a TEMPORAL TABLE FUNCTION scenario, that source emits duplicate > primary keys with INSERT changeflag, when we migrate this case to temporal > table DDL, > I think this source should emit INSERT/UPDATE (UPSERT) messages instead of > INSERT-only messages, e.g. a Kafka compacted topic source? > > Best, > Jark > > > On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kna...@apache.org> wrote: > > > Hi everyone, > > > > I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE. > > > > Best, > > > > Konstantin > > > > On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <ykt...@gmail.com> wrote: > > > > > I agree with Timo, semantic about primary key needs more thought and > > > discussion, especially after FLIP-95 and FLIP-105. > > > > > > Best, > > > Kurt > > > > > > > > > On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <twal...@apache.org> > wrote: > > > > > > > Hi Leonard, > > > > > > > > thanks for the summary. > > > > > > > > After reading all of the previous arguments and working on FLIP-95. I > > > > would also lean towards the conclusion of not adding the TEMPORAL > > > keyword. > > > > > > > > After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can be > > > > represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The FOR > > > > SYSTEM_TIME AS OF t would trigger the internal materialization and > > > > "temporal" logic. > > > > > > > > However, we should discuss the meaning of PRIMARY KEY again in this > > > > case. In a TEMPORAL TABLE scenario, the source would emit duplicate > > > > primary keys with INSERT changeflag but at different point in time. > > > > Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The > > > > changelog semantics of FLIP-95 and FLIP-105 don't work well with a > > > > primary key declaration. > > > > > > > > Regards, > > > > Timo > > > > > > > > > > > > On 20.06.20 17:08, Leonard Xu wrote: > > > > > Hi everyone, > > > > > > > > > > Thanks for the nice discussion. I’d like to move forward the work, > > > > please let me simply summarize the main opinion and current > > divergences. > > > > > > > > > > 1. The agreements have been achieved: > > > > > > > > > > 1.1 The motivation we're discussing temporal table DDL is just for > > > > creating temporal table in pure SQL to replace pre-process temporal > > table > > > > in YAML/Table API for usability. > > > > > 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD FOR > > > > SYSTEM_TIME” is to make user understand easily. > > > > > 1.3 For append-only table, it can convert to changelog table which > > has > > > > been discussed in FLIP-105, we assume the following temporal table is > > > comes > > > > from changelog (Jark, fabian, Timo). > > > > > 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x" > instead > > > of > > > > the current `LATERAL TABLE(rates(x))` has come to an > agreement(Fabian, > > > > Timo, Seth, Konstantin, Kurt). > > > > > > > > > > 2. The small divergence : > > > > > > > > > > About the definition syntax of the temporal table, > > > > > > > > > > CREATE [TEMPORAL] TABLE rates ( > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, > > > > > rate DOUBLE, > > > > > rowtime TIMESTAMP, > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) > > > > > WITH (...); > > > > > > > > > > there is small divergence whether add "TEMPORAL" keyword or not. > > > > > > > > > > 2.1 one opinion is using "CREATE TEMPORAL TABLE" (Timo, Fabian, > > Seth), > > > > the main advantages are: > > > > > (1)"TEMPORAL" keyword is intuitive to indicate the history tracking > > > > semantics. > > > > > (2)"TEMPORAL" keyword illustrates that queries can visit the > previous > > > > versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME" > > keyword. > > > > > > > > > > 2.2 the other is using "CREATE TABLE"(Kurt), the main advantages > are: > > > > > (1)Just primary key and time attribute can track previous versions > > of a > > > > table well. > > > > > (2)The temporal behavior is triggered by temporal join syntax > rather > > > > than in DDL, all Flink DDL table are dynamic table logically > including > > > > temporal table. If we decide to use "TEMPORAL" keyword and treats > > > changelog > > > > as temporal table, other tables backed queue like Kafka should also > use > > > > "TEMPORAL" keyword. > > > > > > > > > > > > > > > IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows > with > > > 2.1 > > > > may confuse users much. If we take a second to think about, for > > > source/sink > > > > table which may backed queue (like kafka) or DB (like MySQL), we did > > not > > > > add any keyword in DDL to specify they are source or sinks, it works > > > well. > > > > > I think temporal table is the third one, kafka data source and DB > > data > > > > source can play as a source/sink/temporal table depends on the > > > > position/syntax that user put them in the query. The above rates > table > > > > > - can be a source table if user put it at `SELECT * FROM > rates;` > > > > > - can be a temporal table if user put it at `SELECT * FROM > > orders > > > > JOIN rates FOR SYSTEM_TIME AS OF orders.proctime > > > > > ON orders.currency = rates.currency;` > > > > > - can be sink table if user put is at `INSERT INTO rates > SELECT > > * > > > > FROM …; ` > > > > > From these cases, we found all tables defined in Flink should be > > > > dynamic table logically, the source/sink/temporal role depends on the > > > > position/syntax in user’s query. > > > > > In fact we have used similar syntax for current lookup > table, > > we > > > > didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and > trigger > > > the > > > > temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x") in > > > query. > > > > > > > > > > So, I prefer to resolve the small divergence with “CREATE TABLE” > > which > > > > > (1) is more unified with our source/sink/temporal dynamic table > > > > conceptually, > > > > > (2) is aligned with current lookup table, > > > > > (3) also make users learn less keyword. > > > > > > > > > > WDYT? > > > > > > > > > > Best, > > > > > Leonard Xu > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Konstantin Knauf > > > > https://twitter.com/snntrable > > > > https://github.com/knaufk > > >