This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ec4c0c3 [FLINK-13356][table][docs] Add documentation for TopN and Deduplication in blink planner ec4c0c3 is described below commit ec4c0c35a55253ee4776c88889359fc682d2feae Author: Jark Wu <imj...@gmail.com> AuthorDate: Thu Aug 22 20:00:28 2019 +0800 [FLINK-13356][table][docs] Add documentation for TopN and Deduplication in blink planner This closes #9511 --- docs/dev/table/sql.md | 223 ++++++++++++++++++++++++++++++++++++++++++++++- docs/dev/table/sql.zh.md | 221 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 443 insertions(+), 1 deletion(-) diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 0cfbdda..c3d0cd3 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -24,7 +24,7 @@ under the License. This is a complete list of Data Definition Language (DDL) and Data Manipulation Language (DML) constructs supported in Flink. * This will be replaced by the TOC -{:toc} +{:toc} ## Query SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and tra [...] @@ -835,6 +835,227 @@ LIMIT 3 {% top %} +#### Top-N + +<span class="label label-danger">Attention</span> Top-N is only supported in Blink planner. + +Top-N queries ask for the N smallest or largest values ordered by columns. Both smallest and largest values sets are considered Top-N queries. Top-N queries are useful in cases where the need is to display only the N bottom-most or the N top- +most records from batch/streaming table on a condition. This result set can be used for further analysis. + +Flink uses the combination of a OVER window clause and a filter condition to express a Top-N query. With the power of OVER window `PARTITION BY` clause, Flink also supports per group Top-N. For example, the top five products per category that have the maximum sales in realtime. Top-N queries are supported for SQL on batch and streaming tables. + +The following shows the syntax of the TOP-N statement: + +{% highlight sql %} +SELECT [column_list] +FROM ( + SELECT [column_list], + ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] + ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum + FROM table_name) +WHERE rownum <= N [AND conditions] +{% endhighlight %} + +**Parameter Specification:** + +- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one, according to the ordering of rows within the partition. Currently, we only support `ROW_NUMBER` as the over window function. In the future, we will support `RANK()` and `DENSE_RANK()`. +- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each partition will have a Top-N result. +- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering columns. The ordering directions can be different on different columns. +- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize this query is a Top-N query. The N represents the N smallest or largest records will be retained. +- `[AND conditions]`: It is free to add other conditions in the where clause, but the other conditions can only be combined with `rownum <= N` using `AND` conjunction. + +<span class="label label-danger">Attention in Streaming Mode</span> The TopN query is <span class="label label-info">Result Updating</span>. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. +It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query. + +The unique keys of Top-N query is the combination of partition columns and rownum column. Top-N query can also derive the unique key of upstream. Take following job as an example, say `product_id` is the unique key of the `ShopSales`, then the unique keys of the Top-N query are [`category`, `rownum`] and [`product_id`]. + +The following examples show how to specify SQL queries with Top-N on streaming tables. This is an example to get "the top five products per category that have the maximum sales in realtime" we mentioned above. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...); +// register the DataStream as table "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales"); + +// select top-5 products per category which have the maximum sales. +Table result1 = tableEnv.sqlQuery( + "SELECT * " + + "FROM (" + + " SELECT *," + + " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" + + " FROM ShopSales)" + + "WHERE row_num <= 5"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(String, String, String, Long)] = env.addSource(...) +// register the DataStream under the name "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales) + + +// select top-5 products per category which have the maximum sales. +val result1 = tableEnv.sqlQuery( + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num + | FROM ShopSales) + |WHERE row_num <= 5 + """.stripMargin) +{% endhighlight %} +</div> +</div> + +##### No Ranking Output Optimization + +As described above, the `rownum` field will be written into the result table as one field of the unique key, which may lead to a lot of records being written to the result table. For example, when the record (say `product-1001`) of ranking 9 is updated and its rank is upgraded to 1, all the records from ranking 1 ~ 9 will be output to the result table as update messages. If the result table receives too many data, it will become the bottleneck of the SQL job. + +The optimization way is omitting rownum field in the outer SELECT clause of the Top-N query. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record (`product-1001`) needs to be sent to downstream, which can reduce much IO to the result table. + +The following example shows how to optimize the above Top-N example in this way: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...); +// register the DataStream as table "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales"); + +// select top-5 products per category which have the maximum sales. +Table result1 = tableEnv.sqlQuery( + "SELECT product_id, category, product_name, sales " + // omit row_num field in the output + "FROM (" + + " SELECT *," + + " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" + + " FROM ShopSales)" + + "WHERE row_num <= 5"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(String, String, String, Long)] = env.addSource(...) +// register the DataStream under the name "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales) + + +// select top-5 products per category which have the maximum sales. +val result1 = tableEnv.sqlQuery( + """ + |SELECT product_id, category, product_name, sales -- omit row_num field in the output + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num + | FROM ShopSales) + |WHERE row_num <= 5 + """.stripMargin) +{% endhighlight %} +</div> +</div> + +<span class="label label-danger">Attention in Streaming Mode</span> In order to output the above query to an external storage and have a correct result, the external storage must have the same unique key with the Top-N query. In the above example query, if the `product_id` is the unique key of the query, then the external table should also has `product_id` as the unique key. + +{% top %} + +#### Deduplication + +<span class="label label-danger">Attention</span> Deduplication is only supported in Blink planner. + +Deduplication is removing rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a deduplication is needed before further analysis. + +Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N query. In theory, deduplication is a special case of Top-N which the N is one and order by the processing time or event time. + +The following shows the syntax of the Deduplication statement: + +{% highlight sql %} +SELECT [column_list] +FROM ( + SELECT [column_list], + ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] + ORDER BY time_attr [asc|desc]) AS rownum + FROM table_name) +WHERE rownum = 1 +{% endhighlight %} + +**Parameter Specification:** + +- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one. +- `PARTITION BY col1[, col2...]`: Specifies the partition columns, i.e. the deduplicate key. +- `ORDER BY time_attr [asc|desc]`: Specifies the ordering column, it must be a [time attribute](streaming/time_attributes.html). Currently only support [proctime attribute](streaming/time_attributes.html#processing-time). [Rowtime atttribute](streaming/time_attributes.html#event-time) will be supported in the future. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row. +- `WHERE rownum = 1`: The `rownum = 1` is required for Flink to recognize this query is deduplication. + +The following examples show how to specify SQL queries with Deduplication on streaming tables. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...); +// register the DataStream as table "Orders" +tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime"); + +// remove duplicate rows on order_id and keep the first occurrence row, +// because there shouldn't be two orders with the same order_id. +Table result1 = tableEnv.sqlQuery( + "SELECT order_id, user, product, number " + + "FROM (" + + " SELECT *," + + " ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" + + " FROM Orders)" + + "WHERE row_num = 1"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(String, String, String, Int)] = env.addSource(...) +// register the DataStream under the name "Orders" +tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 'proctime.proctime) + +// remove duplicate rows on order_id and keep the first occurrence row, +// because there shouldn't be two orders with the same order_id. +val result1 = tableEnv.sqlQuery( + """ + |SELECT order_id, user, product, number + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num + | FROM Orders) + |WHERE row_num = 1 + """.stripMargin) +{% endhighlight %} +</div> +</div> + +{% top %} + #### Insert <div markdown="1"> diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md index dd3869e..642402a 100644 --- a/docs/dev/table/sql.zh.md +++ b/docs/dev/table/sql.zh.md @@ -835,6 +835,227 @@ LIMIT 3 {% top %} +#### Top-N + +<span class="label label-danger">Attention</span> Top-N is only supported in Blink planner. + +Top-N queries ask for the N smallest or largest values ordered by columns. Both smallest and largest values sets are considered Top-N queries. Top-N queries are useful in cases where the need is to display only the N bottom-most or the N top- +most records from batch/streaming table on a condition. This result set can be used for further analysis. + +Flink uses the combination of a OVER window clause and a filter condition to express a Top-N query. With the power of OVER window `PARTITION BY` clause, Flink also supports per group Top-N. For example, the top five products per category that have the maximum sales in realtime. Top-N queries are supported for SQL on batch and streaming tables. + +The following shows the syntax of the TOP-N statement: + +{% highlight sql %} +SELECT [column_list] +FROM ( + SELECT [column_list], + ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] + ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum + FROM table_name) +WHERE rownum <= N [AND conditions] +{% endhighlight %} + +**Parameter Specification:** + +- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one, according to the ordering of rows within the partition. Currently, we only support `ROW_NUMBER` as the over window function. In the future, we will support `RANK()` and `DENSE_RANK()`. +- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each partition will have a Top-N result. +- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering columns. The ordering directions can be different on different columns. +- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize this query is a Top-N query. The N represents the N smallest or largest records will be retained. +- `[AND conditions]`: It is free to add other conditions in the where clause, but the other conditions can only be combined with `rownum <= N` using `AND` conjunction. + +<span class="label label-danger">Attention in Streaming Mode</span> The TopN query is <span class="label label-info">Result Updating</span>. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. +It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query. + +The unique keys of Top-N query is the combination of partition columns and rownum column. Top-N query can also derive the unique key of upstream. Take following job as an example, say `product_id` is the unique key of the `ShopSales`, then the unique keys of the Top-N query are [`category`, `rownum`] and [`product_id`]. + +The following examples show how to specify SQL queries with Top-N on streaming tables. This is an example to get "the top five products per category that have the maximum sales in realtime" we mentioned above. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...); +// register the DataStream as table "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales"); + +// select top-5 products per category which have the maximum sales. +Table result1 = tableEnv.sqlQuery( + "SELECT * " + + "FROM (" + + " SELECT *," + + " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" + + " FROM ShopSales)" + + "WHERE row_num <= 5"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(String, String, String, Long)] = env.addSource(...) +// register the DataStream under the name "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales) + + +// select top-5 products per category which have the maximum sales. +val result1 = tableEnv.sqlQuery( + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num + | FROM ShopSales) + |WHERE row_num <= 5 + """.stripMargin) +{% endhighlight %} +</div> +</div> + +##### No Ranking Output Optimization + +As described above, the `rownum` field will be written into the result table as one field of the unique key, which may lead to a lot of records being written to the result table. For example, when the record (say `product-1001`) of ranking 9 is updated and its rank is upgraded to 1, all the records from ranking 1 ~ 9 will be output to the result table as update messages. If the result table receives too many data, it will become the bottleneck of the SQL job. + +The optimization way is omitting rownum field in the outer SELECT clause of the Top-N query. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record (`product-1001`) needs to be sent to downstream, which can reduce much IO to the result table. + +The following example shows how to optimize the above Top-N example in this way: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...); +// register the DataStream as table "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales"); + +// select top-5 products per category which have the maximum sales. +Table result1 = tableEnv.sqlQuery( + "SELECT product_id, category, product_name, sales " + // omit row_num field in the output + "FROM (" + + " SELECT *," + + " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" + + " FROM ShopSales)" + + "WHERE row_num <= 5"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(String, String, String, Long)] = env.addSource(...) +// register the DataStream under the name "ShopSales" +tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 'product_name, 'sales) + + +// select top-5 products per category which have the maximum sales. +val result1 = tableEnv.sqlQuery( + """ + |SELECT product_id, category, product_name, sales -- omit row_num field in the output + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num + | FROM ShopSales) + |WHERE row_num <= 5 + """.stripMargin) +{% endhighlight %} +</div> +</div> + +<span class="label label-danger">Attention in Streaming Mode</span> In order to output the above query to an external storage and have a correct result, the external storage must have the same unique key with the Top-N query. In the above example query, if the `product_id` is the unique key of the query, then the external table should also has `product_id` as the unique key. + +{% top %} + +#### Deduplication + +<span class="label label-danger">Attention</span> Deduplication is only supported in Blink planner. + +Deduplication is removing rows that duplicate over a set of columns, keeping only the first one or the last one. In some cases, the upstream ETL jobs are not end-to-end exactly-once, this may result in there are duplicate records in the sink in case of failover. However, the duplicate records will affect the correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a deduplication is needed before further analysis. + +Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N query. In theory, deduplication is a special case of Top-N which the N is one and order by the processing time or event time. + +The following shows the syntax of the Deduplication statement: + +{% highlight sql %} +SELECT [column_list] +FROM ( + SELECT [column_list], + ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] + ORDER BY time_attr [asc|desc]) AS rownum + FROM table_name) +WHERE rownum = 1 +{% endhighlight %} + +**Parameter Specification:** + +- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting with one. +- `PARTITION BY col1[, col2...]`: Specifies the partition columns, i.e. the deduplicate key. +- `ORDER BY time_attr [asc|desc]`: Specifies the ordering column, it must be a [time attribute](streaming/time_attributes.html). Currently only support [proctime attribute](streaming/time_attributes.html#processing-time). [Rowtime atttribute](streaming/time_attributes.html#event-time) will be supported in the future. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row. +- `WHERE rownum = 1`: The `rownum = 1` is required for Flink to recognize this query is deduplication. + +The following examples show how to specify SQL queries with Deduplication on streaming tables. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// ingest a DataStream from an external source +DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...); +// register the DataStream as table "Orders" +tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime"); + +// remove duplicate rows on order_id and keep the first occurrence row, +// because there shouldn't be two orders with the same order_id. +Table result1 = tableEnv.sqlQuery( + "SELECT order_id, user, product, number " + + "FROM (" + + " SELECT *," + + " ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" + + " FROM Orders)" + + "WHERE row_num = 1"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// read a DataStream from an external source +val ds: DataStream[(String, String, String, Int)] = env.addSource(...) +// register the DataStream under the name "Orders" +tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 'proctime.proctime) + +// remove duplicate rows on order_id and keep the first occurrence row, +// because there shouldn't be two orders with the same order_id. +val result1 = tableEnv.sqlQuery( + """ + |SELECT order_id, user, product, number + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num + | FROM Orders) + |WHERE row_num = 1 + """.stripMargin) +{% endhighlight %} +</div> +</div> + +{% top %} + #### Insert <div markdown="1">