sjwiesman commented on a change in pull request #14292: URL: https://github.com/apache/flink/pull/14292#discussion_r538482958
########## File path: docs/dev/table/streaming/temporal_tables.md ########## @@ -57,134 +56,68 @@ SELECT * FROM product_changelog; -(DELETE) 18:00:00 p_001 scooter 12.99 {% endhighlight %} -The table `product_changelog` represents an ever growing changelog of database table `products`, for example, the initial price of product `scooter` is `11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`, - the product item is deleted from the table `products` at `18:00:00`. +Given this set of changes, we track how the price of a scooter changes over time. +It is initially $11.11 at `00:01:00` when added to the catalog. +The price then rises to $12.99 at `12:00:00` before being deleted from the catalog at `18:00:00`. -Given that we would like to output the version of `product_changelog` table of the time `10:00:00`, the following table shows the result. -{% highlight sql %} -update_time product_id product_name price -=========== ========== ============ ===== -00:01:00 p_001 scooter 11.11 -00:02:00 p_002 basketball 23.11 -{% endhighlight %} +If we queried the table for various products' prices at different times, we would retrieve different results. At `10:00:00` the table would show one set of prices. -Given that we would like to output the version of `product_changelog` table of the time `13:00:00`, the following table shows the result. {% highlight sql %} update_time product_id product_name price =========== ========== ============ ===== 12:00:00 p_001 scooter 12.99 12:00:00 p_002 basketball 19.99 {% endhighlight %} -In above example, the specific version of the table is tracked by `update_time` and `product_id`, the `product_id` would be a primary key for `product_changelog` table and `update_time` would be the event time. - -In Flink, this is represented by a [*versioned table*](#defining-versioned-table). - -### Correlate with a regular table -On the other hand, some use cases require to join a regular table which is an external database table. +While at `13:00:00,` we found find another. -Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is materialized with the latest rates. The `LatestRates` always represents the latest content of hbase table `rates`. - -Then the content of `LatestRates` table when we query at time `10:15:00` is: {% highlight sql %} -10:15:00 > SELECT * FROM LatestRates; - -currency rate -========= ==== -US Dollar 102 -Euro 114 -Yen 1 -{% endhighlight %} - -Then the content of `LatestRates` table when we query at time `11:00:00` is: -{% highlight sql %} -11:00:00 > SELECT * FROM LatestRates; - -currency rate -========= ==== -US Dollar 102 -Euro 116 -Yen 1 +update_time product_id product_name price +=========== ========== ============ ===== +12:00:00 p_001 scooter 12.99 +12:00:00 p_002 basketball 19.99 {% endhighlight %} -In Flink, this is represented by a [*regular Table*](#defining-regular-table). -Temporal Table --------------- -<span class="label label-danger">Attention</span> This is only supported in Blink planner. +## Versioned Table Sources -Flink uses primary key constraint and event time to define both versioned table and versioned view. +Versioned tables are defined implicity for any table whose underlying source or format directly define changelogs. Examples include the [upsert Kafka]({% link dev/table/connectors/upsert-kafka.md %}) source as well as database changelog formats such as [debezium]({% link dev/table/connectors/formats/debezium.md %}) and [canal]({% link dev/table/connectors/formats/canal.md %}). As discussed above, the only additional requirement is the `CREATE` table statement must contain a primary key and event-time attribute. -### Defining Versioned Table -The table is a versioned table in Flink only is the table contains primary key constraint and event time. {% highlight sql %} --- Define a versioned table -CREATE TABLE product_changelog ( - product_id STRING, - product_name STRING, - product_price DECIMAL(10, 4), - update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, - PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key constraint - WATERMARK FOR update_time AS update_time -- (2) defines the event time by watermark -) WITH ( - 'connector' = 'kafka', - 'topic' = 'products', - 'scan.startup.mode' = 'earliest-offset', - 'properties.bootstrap.servers' = 'localhost:9092', - 'value.format' = 'debezium-json' -); +CREATE TABLE products ( + product_id STRING, + product_name STRING, + price DECIMAL(32, 2), + update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, + PRIMARY KEY (product_id) NOT ENFORCED + WATERMARK FOR update_time AS update_time +) WITH (...); {% endhighlight %} -Line `(1)` defines the primary key constraint for table `product_changelog`, Line `(2)` defines the `update_time` as event time for table `product_changelog`, -thus table `product_changelog` is a versioned table. +## Versioned Table Views -**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means extract the database -operation execution time for every changelog, it's strongly recommended defines the database operation execution time -as event time rather than ingestion-time or time in the record, otherwise the version extract from the changelog may -mismatch with the version in database. - -### Defining Versioned View +Flink also supports defining versioned views if the underlying query contains a unique key constraint and event-time attribute. Image an append-only table of currency rates. -Flink also supports defining versioned view only if the view contains unique key constraint and event time. - -Let’s assume that we have the following table `RatesHistory`: {% highlight sql %} --- Define an append-only table -CREATE TABLE RatesHistory ( - currency_time TIMESTAMP(3), - currency STRING, - rate DECIMAL(38, 10), - WATERMARK FOR currency_time AS currency_time -- defines the event time +CREATE TABLE currency_rates ( + currency STRING, + rate DECIMAL(32, 10) + update_time TIMESTAMP(3), + WATERMARK FOR update_time AS update_time ) WITH ( - 'connector' = 'kafka', - 'topic' = 'rates', - 'scan.startup.mode' = 'earliest-offset', - 'properties.bootstrap.servers' = 'localhost:9092', - 'format' = 'json' -- this is an append only source -) + 'connector' = 'kafka', + 'topic' = 'rates', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'json' +); {% endhighlight %} -Table `RatesHistory` represents an ever growing append-only table of currency exchange rates with respect to -Yen (which has a rate of 1). For example, the exchange rate for the period from 09:00 to 10:45 of Euro to Yen was 114. -From 10:45 to 11:15 it was 116. - -{% highlight sql %} -SELECT * FROM RatesHistory; - -currency_time currency rate -============= ========= ==== -09:00:00 US Dollar 102 -09:00:00 Euro 114 -09:00:00 Yen 1 -10:45:00 Euro 116 -11:15:00 Euro 119 -11:49:00 Pounds 108 -{% endhighlight %} +The table `currency_rates` contains a row for each currency - with respect to USD - and receives a new row each time the rate changes. +The `JSON` format does not support native changelog semantics, and so Flink can only read this table as append-only. +However, it is clear to us (the query developer) that this table has all the necessary information to define a versioned table. -To define a versioned table on `RatesHistory`, Flink supports defining a versioned view -by [deduplication query]({% link dev/table/sql/queries.md %}#deduplication) which produces an ordered changelog -stream with an inferred primary key(`currency`) and event time(`currency_time`). +Flink can reinterpret this table as a versioned table by defining a [deduplication query]({% link dev/table/sql/queries.md %}#deduplication) which produces an ordered changelog stream with an inferred primary key (currency) and event time (update_time). Review comment: The real answer is flink has a special optimization step that turns any `OVER WINDOW` with exactly this format into a versioned table keyed by the fields in the `PARTITION BY` clause. ```sql SELECT *, ROW_NUMBER() OVER (PARTITION BY [key,] ORDER BY [time attribute] DESC) AS row_num WHERE row_num = 1 ``` In a batch query, this would return the most recent value for each currency. The streaming semantic is it provides the new value for each partition each time a more recent value is made available. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org