sjwiesman commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r538482958
##
File path: docs/dev/table/streaming/temporal_tables.md
##
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=== == =
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=== == =
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
-
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency rate
-=
-US Dollar 102
-Euro 114
-Yen 1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency rate
-=
-US Dollar 102
-Euro 116
-Yen 1
+update_time product_id product_name price
+=== == =
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
-Temporal Table
---
-Attention This is only supported in
Blink planner.
+## Versioned Table Sources
-Flink uses primary key constraint and event time to define both versioned
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source
or format directly define changelogs. Examples include the [upsert Kafka]({%
link dev/table/connectors/upsert-kafka.md %}) source as well as database
changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and event-time attribute.
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key
constraint and event time.
{% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
- product_id STRING,
- product_name STRING,
- product_price DECIMAL(10, 4),
- update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
- PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key
constraint
- WATERMARK FOR update_time AS update_time -- (2) defines the event time by
watermark
-) WITH (
- 'connector' = 'kafka',
- 'topic' = 'products',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+ product_idSTRING,
+ product_name STRING,
+ price DECIMAL(32, 2),
+ update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp'
VIRTUAL,
+ PRIMARY KEY (product_id) NOT ENFORCED
+