sjwiesman commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r538482958



##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      114
-Yen       1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      116
-Yen       1
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 
 
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
 {% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
-  product_id STRING,
-  product_name STRING,
-  product_price DECIMAL(10, 4),
-  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
-  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
-  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
-) WITH (
-  'connector' = 'kafka',
-  'topic' = 'products',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+       product_id    STRING,
+       product_name  STRING,
+       price         DECIMAL(32, 2),
+       update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' 
VIRTUAL,
+       PRIMARY KEY (product_id) NOT ENFORCED
+       WATERMARK FOR update_time AS update_time
+) WITH (...);
 {% endhighlight %}
 
-Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
 
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
-operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
-as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
-mismatch with the version in database.
- 
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains 
a unique key constraint and event-time attribute. Image an append-only table of 
currency rates. 
 
-Flink also supports defining versioned view only if the view contains unique 
key constraint and event time.
- 
-Let’s assume that we have the following table `RatesHistory`:
 {% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
-    currency_time TIMESTAMP(3),
-    currency STRING,
-    rate DECIMAL(38, 10),
-    WATERMARK FOR currency_time AS currency_time   -- defines the event time
+CREATE TABLE currency_rates (
+       currency      STRING,
+       rate          DECIMAL(32, 10)
+       update_time   TIMESTAMP(3),
+       WATERMARK FOR update_time AS update_time
 ) WITH (
-  'connector' = 'kafka',
-  'topic' = 'rates',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'format' = 'json'                                -- this is an append only 
source
-)
+       'connector' = 'kafka',
+       'topic'     = 'rates',
+       'properties.bootstrap.servers' = 'localhost:9092',
+       'format'    = 'json'
+);
 {% endhighlight %}
 
-Table `RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to 
-Yen (which has a rate of 1). For example, the exchange rate for the period 
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency  rate
-============= ========= ====
-09:00:00      US Dollar 102
-09:00:00      Euro      114
-09:00:00      Yen       1
-10:45:00      Euro      116
-11:15:00      Euro      119
-11:49:00      Pounds    108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to 
USD - and receives a new row each time the rate changes.
+The `JSON` format does not support native changelog semantics, and so Flink 
can only read this table as append-only.
+However, it is clear to us (the query developer) that this table has all the 
necessary information to define a versioned table. 
 
-To define a versioned table on `RatesHistory`, Flink supports defining a 
versioned view 
-by [deduplication query]({% link dev/table/sql/queries.md %}#deduplication) 
which produces an ordered changelog
-stream with an inferred primary key(`currency`) and event 
time(`currency_time`).
+Flink can reinterpret this table as a versioned table by defining a 
[deduplication query]({% link dev/table/sql/queries.md %}#deduplication) which 
produces an ordered changelog stream with an inferred primary key (currency) 
and event time (update_time). 

Review comment:
       The real answer is flink has a special optimization step that turns any 
`OVER WINDOW` with exactly this format into a versioned table keyed by the 
fields in the `PARTITION BY` clause. 
   
   ```sql
   SELECT *,
         ROW_NUMBER() OVER (PARTITION BY [key,] ORDER BY [time attribute] DESC) 
AS row_num
   WHERE 
        row_num = 1
   ```
   
   In a batch query, this would return the most recent value for each currency. 
The streaming semantic is it provides the new value for each partition each 
time a more recent value is made available. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to