[GitHub] [flink] sjwiesman commented on a change in pull request #14292: [FLINK-20456][docs] Make streaming SQL concepts more approachable

2020-12-09 Thread GitBox


sjwiesman commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r539660475



##
File path: docs/redirects/temporal_table.md
##
@@ -0,0 +1,24 @@
+---
+title: Temporal Tables
+layout: redirect
+redirect: /dev/table/streaming/versioned_table.html

Review comment:
   - Now that you say it, I think legacy makes more sense. That's the 
content people are expecting to find. And that page already cross-links to 
versioned_tables. 
   - The `zh` redirect happens automatically 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #14292: [FLINK-20456][docs] Make streaming SQL concepts more approachable

2020-12-09 Thread GitBox


sjwiesman commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r539658939



##
File path: docs/redirects/temporal_table.md
##
@@ -0,0 +1,24 @@
+---
+title: Temporal Tables
+layout: redirect
+redirect: /dev/table/streaming/versioned_table.html

Review comment:
   damn, thank you 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] sjwiesman commented on a change in pull request #14292: [FLINK-20456][docs] Make streaming SQL concepts more approachable

2020-12-08 Thread GitBox


sjwiesman commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r538482958



##
File path: docs/dev/table/streaming/temporal_tables.md
##
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE) 18:00:00 p_001  scooter  12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===  ==  = 
-00:01:00 p_001  scooter  11.11
-00:02:00 p_002  basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===  ==  = 
 12:00:00 p_001  scooter  12.99
 12:00:00 p_002  basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-= 
-US Dollar 102
-Euro  114
-Yen   1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-= 
-US Dollar 102
-Euro  116
-Yen   1
+update_time  product_id product_name price
+===  ==  = 
+12:00:00 p_001  scooter  12.99
+12:00:00 p_002  basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---
-Attention This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 
 
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
 {% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
-  product_id STRING,
-  product_name STRING,
-  product_price DECIMAL(10, 4),
-  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
-  PRIMARY KEY(product_id) NOT ENFORCED,  -- (1) defines the primary key 
constraint
-  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark   
-) WITH (
-  'connector' = 'kafka',
-  'topic' = 'products',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+   product_idSTRING,
+   product_name  STRING,
+   price DECIMAL(32, 2),
+   update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' 
VIRTUAL,
+   PRIMARY KEY (product_id) NOT ENFORCED
+