Hi, everyone

Thanks Fabian,Kurt for making the multiple version(event time) clear, I also 
like the 'PERIOD FOR SYSTEM' syntax which supported in SQL standard. I think we 
can add some explanation of the multiple version support in the future section 
of FLIP.

For the PRIMARY KEY semantic, I agree with Jark's point that the semantic 
should unify both on changelog source and insert-only source.

Currently, Flink supports PRIMARY KEY after FLIP-87, Flink uses PRIMARY KEY NOT 
ENFORCED because Flink does not own the data like other DBMS therefore Flink 
won't validate/enforce the key integrity and only trusts the external systems. 
It is  expected user and external system/application should make sure no 
deduplicated records happened when using NOT ENFORCED.

(a) For PRIMARY KEY NOT ENFORCED semantic on changelog source:
It means the materialized changelogs (INSERT/UPDATE/DELETE) should be unique on 
the primary key constraints.Flink assumes messages are in order on the primary 
key. Flink will use the PRIMARY KEY for some optimization, e.g. use the PRIMARY 
KEY to update the materialized state by key in temporal join operator. 
 
(b) For PRIMARY KEY NOT ENFORCED semantic on insert-only source:
It means records should be unique on the primary key constraints. If there are 
INSERT records with duplicate primary key columns, the result of SQL query 
might be nondeterministic because it broken the PRIMARY KEY constraints.

Cheers,
Leonard


> 在 2020年6月23日,23:35,Fabian Hueske <fhue...@gmail.com> 写道:
> 
> Thanks Kurt,
> 
> Yes, you are right.
> The `PERIOD FOR SYSTEM_TIME` that you linked before corresponds to the
> VERSION clause that I used and would explicitly define the versioning of a
> table.
> I didn't know that the `PERIOD FOR SYSTEM_TIME` cause is already defined by
> the SQL standard.
> I think we would need a slightly different syntax though because (so far)
> the validity of a row is determined by its own timestamp and the timestamp
> of the next row.
> 
> Adding a clause later solves the ambiguity issue for tables with multiple
> event-time attributes.
> However, I'd feel more comfortable having such a cause and an explicit
> definition of the temporal property from the beginning.
> I guess this is a matter of personal preference so I'll go with the
> majority if we decide that every table that has a primary key and an
> event-time attribute should be usable in an event-time temporal table join.
> 
> Thanks, Fabian
> 
> 
> Am Di., 23. Juni 2020 um 16:58 Uhr schrieb Kurt Young <ykt...@gmail.com>:
> 
>> Hi Fabian,
>> 
>> I agree with you that implicitly letting event time to be the version of
>> the table will
>> work in most cases, but not for all. That's the reason I mentioned `PERIOD
>> FOR` [1]
>> syntax in my first email, which is already in sql standard to represent the
>> validity of
>> each row in the table.
>> 
>> If the event time can't be used, or multiple event time are defined, we
>> could still add
>> this syntax in the future.
>> 
>> What do you think?
>> 
>> [1]
>> 
>> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
>> Best,
>> Kurt
>> 
>> 
>> On Tue, Jun 23, 2020 at 9:12 PM Fabian Hueske <fhue...@gmail.com> wrote:
>> 
>>> Hi everyone,
>>> 
>>> Every table with a primary key and an event-time attribute provides what
>> is
>>> needed for an event-time temporal table join.
>>> I agree that, from a technical point of view, the TEMPORAL keyword is not
>>> required.
>>> 
>>> I'm more sceptical about implicitly deriving the versioning information
>> of
>>> a (temporal) table as the table's only event-time attribute.
>>> In the query
>>> 
>>> SELECT *
>>> FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
>>> WHERE o.currency = r.currency
>>> 
>>> the syntax of the temporal table join does not explicitly reference the
>>> version of the temporal rates table.
>>> Hence, the system needs a way to derive the version of temporal table.
>>> 
>>> Implicitly using the (only) event-time attribute of a temporal table
>> (rates
>>> in the example above) to identify the right version works in most cases,
>>> but probably not in all.
>>> * What if a table has more than one event-time attribute? (TableSchema is
>>> designed to support multiple watermarks; queries with interval joins
>>> produce tables with multiple event-time attributes, ...)
>>> * What if the table does not have an event-time attribute in its schema
>> but
>>> the version should only be provided as meta data?
>>> 
>>> We could add a clause to define the version of a table, such as:
>>> 
>>> CREATE TABLE rates (
>>>   currency CHAR(3) NOT NULL PRIMARY KEY,
>>>   rate DOUBLE,
>>>   rowtime TIMESTAMP,
>>>   WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE),
>>> VERSION (rowtime)
>>> WITH (...);
>>> 
>>> The presence of a the VERSION clause (or whatever syntax) would
>> explicitly
>>> define the version of a (temporal) table.
>>> It would also render the need for the TEMPORAL keyword superfluous
>> because
>>> there would be another indicator that a table can be used in a temporal
>>> table join.
>>> 
>>> I'm OK with not adding the TEMPORAL keyword, but I recommend that we
>> think
>>> again about the proposed implicit definition of a table's version and how
>>> it might limit use in the future.
>>> 
>>> Cheers,
>>> Fabian
>>> 
>>> Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <imj...@gmail.com>:
>>> 
>>>> I'm also +1 for not adding the TEMPORAL keyword.
>>>> 
>>>> +1 to make the PRIMARY KEY semantic clear for sources.
>>>> From my point of view:
>>>> 
>>>> 1) PRIMARY KEY on changelog souruce:
>>>> It means that when the changelogs (INSERT/UPDATE/DELETE) are
>>> materialized,
>>>> the materialized table should be unique on the primary key columns.
>>>> Flink assumes messages are in order on the primary key. Flink doesn't
>>>> validate/enforces the key integrity, but simply trust it (thus NOT
>>>> ENFORCED).
>>>> Flink will use the PRIMARY KEY for some optimization, e.g. use the
>>> PRIMARY
>>>> KEY to update the materilized state by key in temporal join operator.
>>>> 
>>>> 2) PRIMARY KEY on insert-only source:
>>>> I prefer to have the same semantic to the batch source and changelog
>>>> source, that it implies that records are not duplicate on the primary
>>> key.
>>>> Flink just simply trust the primary key constraint, and doesn't valid
>> it.
>>>> If there is duplicate primary keys with INSERT changeflag, then result
>> of
>>>> Flink query might be wrong.
>>>> 
>>>> If this is a TEMPORAL TABLE FUNCTION scenario, that source emits
>>> duplicate
>>>> primary keys with INSERT changeflag, when we migrate this case to
>>> temporal
>>>> table DDL,
>>>> I think this source should emit INSERT/UPDATE (UPSERT) messages instead
>>> of
>>>> INSERT-only messages,  e.g. a Kafka compacted topic source?
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>> 
>>>> On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kna...@apache.org>
>>> wrote:
>>>> 
>>>>> Hi everyone,
>>>>> 
>>>>> I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE.
>>>>> 
>>>>> Best,
>>>>> 
>>>>> Konstantin
>>>>> 
>>>>> On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <ykt...@gmail.com>
>> wrote:
>>>>> 
>>>>>> I agree with Timo, semantic about primary key needs more thought
>> and
>>>>>> discussion, especially after FLIP-95 and FLIP-105.
>>>>>> 
>>>>>> Best,
>>>>>> Kurt
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <twal...@apache.org>
>>>> wrote:
>>>>>> 
>>>>>>> Hi Leonard,
>>>>>>> 
>>>>>>> thanks for the summary.
>>>>>>> 
>>>>>>> After reading all of the previous arguments and working on
>>> FLIP-95. I
>>>>>>> would also lean towards the conclusion of not adding the TEMPORAL
>>>>>> keyword.
>>>>>>> 
>>>>>>> After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can
>> be
>>>>>>> represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The
>>> FOR
>>>>>>> SYSTEM_TIME AS OF t would trigger the internal materialization
>> and
>>>>>>> "temporal" logic.
>>>>>>> 
>>>>>>> However, we should discuss the meaning of PRIMARY KEY again in
>> this
>>>>>>> case. In a TEMPORAL TABLE scenario, the source would emit
>> duplicate
>>>>>>> primary keys with INSERT changeflag but at different point in
>> time.
>>>>>>> Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The
>>>>>>> changelog semantics of FLIP-95 and FLIP-105 don't work well with
>> a
>>>>>>> primary key declaration.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>> 
>>>>>>> 
>>>>>>> On 20.06.20 17:08, Leonard Xu wrote:
>>>>>>>> Hi everyone,
>>>>>>>> 
>>>>>>>> Thanks for the nice discussion. I’d like to move forward the
>>> work,
>>>>>>> please let me simply summarize the main opinion and current
>>>>> divergences.
>>>>>>>> 
>>>>>>>> 1. The agreements have been achieved:
>>>>>>>> 
>>>>>>>> 1.1 The motivation we're discussing temporal table DDL is just
>>> for
>>>>>>> creating temporal table in pure SQL to replace pre-process
>> temporal
>>>>> table
>>>>>>> in YAML/Table API for usability.
>>>>>>>> 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD
>> FOR
>>>>>>> SYSTEM_TIME” is to make user understand easily.
>>>>>>>> 1.3 For append-only table, it can convert to changelog table
>>> which
>>>>> has
>>>>>>> been discussed in FLIP-105, we assume the following temporal
>> table
>>> is
>>>>>> comes
>>>>>>> from changelog (Jark, fabian, Timo).
>>>>>>>> 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x"
>>>> instead
>>>>>> of
>>>>>>> the current `LATERAL TABLE(rates(x))`  has come to an
>>>> agreement(Fabian,
>>>>>>> Timo, Seth, Konstantin, Kurt).
>>>>>>>> 
>>>>>>>> 2. The small divergence :
>>>>>>>> 
>>>>>>>> About the definition syntax of the temporal table,
>>>>>>>> 
>>>>>>>> CREATE [TEMPORAL] TABLE rates (
>>>>>>>>    currency CHAR(3) NOT NULL PRIMARY KEY,
>>>>>>>>    rate DOUBLE,
>>>>>>>>    rowtime TIMESTAMP,
>>>>>>>>    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
>>>>>>>> WITH (...);
>>>>>>>> 
>>>>>>>> there is small divergence whether add "TEMPORAL" keyword or
>> not.
>>>>>>>> 
>>>>>>>> 2.1  one opinion is using "CREATE TEMPORAL TABLE" (Timo,
>> Fabian,
>>>>> Seth),
>>>>>>> the main advantages are:
>>>>>>>> (1)"TEMPORAL" keyword is intuitive to indicate the history
>>> tracking
>>>>>>> semantics.
>>>>>>>> (2)"TEMPORAL" keyword illustrates that queries can visit the
>>>> previous
>>>>>>> versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME"
>>>>> keyword.
>>>>>>>> 
>>>>>>>> 2.2 the other is using "CREATE TABLE"(Kurt), the main
>> advantages
>>>> are:
>>>>>>>> (1)Just primary key and time attribute can track previous
>>> versions
>>>>> of a
>>>>>>> table well.
>>>>>>>> (2)The temporal behavior is triggered by temporal join syntax
>>>> rather
>>>>>>> than in DDL, all Flink DDL table are dynamic table logically
>>>> including
>>>>>>> temporal table. If we decide to use "TEMPORAL" keyword and treats
>>>>>> changelog
>>>>>>> as temporal table, other tables backed queue like Kafka should
>> also
>>>> use
>>>>>>> "TEMPORAL" keyword.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows
>>>> with
>>>>>> 2.1
>>>>>>> may confuse users much. If we take a second to think about, for
>>>>>> source/sink
>>>>>>> table which may backed queue (like kafka) or DB (like MySQL), we
>>> did
>>>>> not
>>>>>>> add any keyword in DDL to specify they are source or sinks, it
>>> works
>>>>>> well.
>>>>>>>> I think temporal table is the third one,  kafka data source and
>>> DB
>>>>> data
>>>>>>> source can play as a source/sink/temporal table depends on the
>>>>>>> position/syntax that user put them in the query. The above rates
>>>> table
>>>>>>>>     - can be a source table if user put it at `SELECT * FROM
>>>> rates;`
>>>>>>>>     - can be a temporal table if user put it at `SELECT * FROM
>>>>> orders
>>>>>>> JOIN rates FOR SYSTEM_TIME AS OF orders.proctime
>>>>>>>>              ON orders.currency = rates.currency;`
>>>>>>>>     - can be sink table if user put is at `INSERT INTO rates
>>>> SELECT
>>>>> *
>>>>>>> FROM …; `
>>>>>>>> From these cases, we found all tables defined in Flink should
>> be
>>>>>>> dynamic table logically, the source/sink/temporal role depends on
>>> the
>>>>>>> position/syntax in user’s query.
>>>>>>>>       In fact we have used similar syntax for current lookup
>>>> table,
>>>>> we
>>>>>>> didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and
>>>> trigger
>>>>>> the
>>>>>>> temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x")
>>> in
>>>>>> query.
>>>>>>>> 
>>>>>>>> So, I prefer to resolve the small divergence with “CREATE
>> TABLE”
>>>>> which
>>>>>>>> (1) is more unified with our source/sink/temporal dynamic table
>>>>>>> conceptually,
>>>>>>>> (2) is aligned with current lookup table,
>>>>>>>> (3) also make users learn less keyword.
>>>>>>>> 
>>>>>>>> WDYT?
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Leonard Xu
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> 
>>>>> Konstantin Knauf
>>>>> 
>>>>> https://twitter.com/snntrable
>>>>> 
>>>>> https://github.com/knaufk
>>>>> 
>>>> 
>>> 
>> 

Reply via email to