This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec4c0c3  [FLINK-13356][table][docs] Add documentation for TopN and 
Deduplication in blink planner
ec4c0c3 is described below

commit ec4c0c35a55253ee4776c88889359fc682d2feae
Author: Jark Wu <imj...@gmail.com>
AuthorDate: Thu Aug 22 20:00:28 2019 +0800

    [FLINK-13356][table][docs] Add documentation for TopN and Deduplication in 
blink planner
    
    This closes #9511
---
 docs/dev/table/sql.md    | 223 ++++++++++++++++++++++++++++++++++++++++++++++-
 docs/dev/table/sql.zh.md | 221 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 443 insertions(+), 1 deletion(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 0cfbdda..c3d0cd3 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -24,7 +24,7 @@ under the License.
 
 This is a complete list of Data Definition Language (DDL) and Data 
Manipulation Language (DML) constructs supported in Flink.
 * This will be replaced by the TOC
-{:toc} 
+{:toc}
 
 ## Query
 SQL queries are specified with the `sqlQuery()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a 
`Table`. A `Table` can be used in [subsequent SQL and Table API 
queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or 
DataStream](common.html#integration-with-datastream-and-dataset-api), or 
[written to a TableSink](common.html#emit-a-table)). SQL and Table API queries 
can be seamlessly mixed and are holistically optimized and tra [...]
@@ -835,6 +835,227 @@ LIMIT 3
 
 {% top %}
 
+#### Top-N
+
+<span class="label label-danger">Attention</span> Top-N is only supported in 
Blink planner.
+
+Top-N queries ask for the N smallest or largest values ordered by columns. 
Both smallest and largest values sets are considered Top-N queries. Top-N 
queries are useful in cases where the need is to display only the N bottom-most 
or the N top-
+most records from batch/streaming table on a condition. This result set can be 
used for further analysis.
+
+Flink uses the combination of a OVER window clause and a filter condition to 
express a Top-N query. With the power of OVER window `PARTITION BY` clause, 
Flink also supports per group Top-N. For example, the top five products per 
category that have the maximum sales in realtime. Top-N queries are supported 
for SQL on batch and streaming tables.
+
+The following shows the syntax of the TOP-N statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
+   FROM table_name)
+WHERE rownum <= N [AND conditions]
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting 
with one, according to the ordering of rows within the partition. Currently, we 
only support `ROW_NUMBER` as the over window function. In the future, we will 
support `RANK()` and `DENSE_RANK()`.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each 
partition will have a Top-N result.
+- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering 
columns. The ordering directions can be different on different columns.
+- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize 
this query is a Top-N query. The N represents the N smallest or largest records 
will be retained.
+- `[AND conditions]`: It is free to add other conditions in the where clause, 
but the other conditions can only be combined with `rownum <= N` using `AND` 
conjunction.
+
+<span class="label label-danger">Attention in Streaming Mode</span> The TopN 
query is <span class="label label-info">Result Updating</span>. Flink SQL will 
sort the input data stream according to the order key, so if the top N records 
have been changed, the changed ones will be sent as retraction/update records 
to downstream.
+It is recommended to use a storage which supports updating as the sink of 
Top-N query. In addition, if the top N records need to be stored in external 
storage, the result table should have the same unique key with the Top-N query.
+
+The unique keys of Top-N query is the combination of partition columns and 
rownum column. Top-N query can also derive the unique key of upstream. Take 
following job as an example, say `product_id` is the unique key of the 
`ShopSales`, then the unique keys of the Top-N query are [`category`, `rownum`] 
and [`product_id`].
+
+The following examples show how to specify SQL queries with Top-N on streaming 
tables. This is an example to get "the top five products per category that have 
the maximum sales in realtime" we mentioned above.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, 
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT * " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT *
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+##### No Ranking Output Optimization
+
+As described above, the `rownum` field will be written into the result table 
as one field of the unique key, which may lead to a lot of records being 
written to the result table. For example, when the record (say `product-1001`) 
of ranking 9 is updated and its rank is upgraded to 1, all the records from 
ranking 1 ~ 9 will be output to the result table as update messages. If the 
result table receives too many data, it will become the bottleneck of the SQL 
job.
+
+The optimization way is omitting rownum field in the outer SELECT clause of 
the Top-N query. This is reasonable because the number of the top N records is 
usually not large, thus the consumers can sort the records themselves quickly. 
Without rownum field, in the example above, only the changed record 
(`product-1001`) needs to be sent to downstream, which can reduce much IO to 
the result table.
+
+The following example shows how to optimize the above Top-N example in this 
way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, 
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT product_id, category, product_name, sales " + // omit row_num field 
in the output
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT product_id, category, product_name, sales  -- omit row_num field 
in the output
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention in Streaming Mode</span> In order 
to output the above query to an external storage and have a correct result, the 
external storage must have the same unique key with the Top-N query. In the 
above example query, if the `product_id` is the unique key of the query, then 
the external table should also has `product_id` as the unique key.
+
+{% top %}
+
+#### Deduplication
+
+<span class="label label-danger">Attention</span> Deduplication is only 
supported in Blink planner.
+
+Deduplication is removing rows that duplicate over a set of columns, keeping 
only the first one or the last one. In some cases, the upstream ETL jobs are 
not end-to-end exactly-once, this may result in there are duplicate records in 
the sink in case of failover. However, the duplicate records will affect the 
correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a 
deduplication is needed before further analysis.
+
+Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N 
query. In theory, deduplication is a special case of Top-N which the N is one 
and order by the processing time or event time.
+
+The following shows the syntax of the Deduplication statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY time_attr [asc|desc]) AS rownum
+   FROM table_name)
+WHERE rownum = 1
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting 
with one.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns, i.e. the 
deduplicate key.
+- `ORDER BY time_attr [asc|desc]`: Specifies the ordering column, it must be a 
[time attribute](streaming/time_attributes.html). Currently only support 
[proctime attribute](streaming/time_attributes.html#processing-time). [Rowtime 
atttribute](streaming/time_attributes.html#event-time) will be supported in the 
future. Ordering by ASC means keeping the first row, ordering by DESC means 
keeping the last row.
+- `WHERE rownum = 1`: The `rownum = 1` is required for Flink to recognize this 
query is deduplication.
+
+The following examples show how to specify SQL queries with Deduplication on 
streaming tables.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...);
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, 
proctime.proctime");
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT order_id, user, product, number " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as 
row_num" +
+  "   FROM Orders)" +
+  "WHERE row_num = 1");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
+// register the DataStream under the name "Orders"
+tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 
'proctime.proctime)
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT order_id, user, product, number
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) 
as row_num
+      |   FROM Orders)
+      |WHERE row_num = 1
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 #### Insert
 
 <div markdown="1">
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index dd3869e..642402a 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -835,6 +835,227 @@ LIMIT 3
 
 {% top %}
 
+#### Top-N
+
+<span class="label label-danger">Attention</span> Top-N is only supported in 
Blink planner.
+
+Top-N queries ask for the N smallest or largest values ordered by columns. 
Both smallest and largest values sets are considered Top-N queries. Top-N 
queries are useful in cases where the need is to display only the N bottom-most 
or the N top-
+most records from batch/streaming table on a condition. This result set can be 
used for further analysis.
+
+Flink uses the combination of a OVER window clause and a filter condition to 
express a Top-N query. With the power of OVER window `PARTITION BY` clause, 
Flink also supports per group Top-N. For example, the top five products per 
category that have the maximum sales in realtime. Top-N queries are supported 
for SQL on batch and streaming tables.
+
+The following shows the syntax of the TOP-N statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
+   FROM table_name)
+WHERE rownum <= N [AND conditions]
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting 
with one, according to the ordering of rows within the partition. Currently, we 
only support `ROW_NUMBER` as the over window function. In the future, we will 
support `RANK()` and `DENSE_RANK()`.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns. Each 
partition will have a Top-N result.
+- `ORDER BY col1 [asc|desc][, col2 [asc|desc]...]`: Specifies the ordering 
columns. The ordering directions can be different on different columns.
+- `WHERE rownum <= N`: The `rownum <= N` is required for Flink to recognize 
this query is a Top-N query. The N represents the N smallest or largest records 
will be retained.
+- `[AND conditions]`: It is free to add other conditions in the where clause, 
but the other conditions can only be combined with `rownum <= N` using `AND` 
conjunction.
+
+<span class="label label-danger">Attention in Streaming Mode</span> The TopN 
query is <span class="label label-info">Result Updating</span>. Flink SQL will 
sort the input data stream according to the order key, so if the top N records 
have been changed, the changed ones will be sent as retraction/update records 
to downstream.
+It is recommended to use a storage which supports updating as the sink of 
Top-N query. In addition, if the top N records need to be stored in external 
storage, the result table should have the same unique key with the Top-N query.
+
+The unique keys of Top-N query is the combination of partition columns and 
rownum column. Top-N query can also derive the unique key of upstream. Take 
following job as an example, say `product_id` is the unique key of the 
`ShopSales`, then the unique keys of the Top-N query are [`category`, `rownum`] 
and [`product_id`].
+
+The following examples show how to specify SQL queries with Top-N on streaming 
tables. This is an example to get "the top five products per category that have 
the maximum sales in realtime" we mentioned above.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, 
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT * " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT *
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+##### No Ranking Output Optimization
+
+As described above, the `rownum` field will be written into the result table 
as one field of the unique key, which may lead to a lot of records being 
written to the result table. For example, when the record (say `product-1001`) 
of ranking 9 is updated and its rank is upgraded to 1, all the records from 
ranking 1 ~ 9 will be output to the result table as update messages. If the 
result table receives too many data, it will become the bottleneck of the SQL 
job.
+
+The optimization way is omitting rownum field in the outer SELECT clause of 
the Top-N query. This is reasonable because the number of the top N records is 
usually not large, thus the consumers can sort the records themselves quickly. 
Without rownum field, in the example above, only the changed record 
(`product-1001`) needs to be sent to downstream, which can reduce much IO to 
the result table.
+
+The following example shows how to optimize the above Top-N example in this 
way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
+// register the DataStream as table "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, "product_id, category, 
product_name, sales");
+
+// select top-5 products per category which have the maximum sales.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT product_id, category, product_name, sales " + // omit row_num field 
in the output
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num" +
+  "   FROM ShopSales)" +
+  "WHERE row_num <= 5");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
+// register the DataStream under the name "ShopSales"
+tableEnv.registerDataStream("ShopSales", ds, 'product_id, 'category, 
'product_name, 'sales)
+
+
+// select top-5 products per category which have the maximum sales.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT product_id, category, product_name, sales  -- omit row_num field 
in the output
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as 
row_num
+      |   FROM ShopSales)
+      |WHERE row_num <= 5
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention in Streaming Mode</span> In order 
to output the above query to an external storage and have a correct result, the 
external storage must have the same unique key with the Top-N query. In the 
above example query, if the `product_id` is the unique key of the query, then 
the external table should also has `product_id` as the unique key.
+
+{% top %}
+
+#### Deduplication
+
+<span class="label label-danger">Attention</span> Deduplication is only 
supported in Blink planner.
+
+Deduplication is removing rows that duplicate over a set of columns, keeping 
only the first one or the last one. In some cases, the upstream ETL jobs are 
not end-to-end exactly-once, this may result in there are duplicate records in 
the sink in case of failover. However, the duplicate records will affect the 
correctness of downstream analytical jobs (e.g. `SUM`, `COUNT`). So a 
deduplication is needed before further analysis.
+
+Flink uses `ROW_NUMBER()` to remove duplicates just like the way of Top-N 
query. In theory, deduplication is a special case of Top-N which the N is one 
and order by the processing time or event time.
+
+The following shows the syntax of the Deduplication statement:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM (
+   SELECT [column_list],
+     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
+       ORDER BY time_attr [asc|desc]) AS rownum
+   FROM table_name)
+WHERE rownum = 1
+{% endhighlight %}
+
+**Parameter Specification:**
+
+- `ROW_NUMBER()`: Assigns an unique, sequential number to each row, starting 
with one.
+- `PARTITION BY col1[, col2...]`: Specifies the partition columns, i.e. the 
deduplicate key.
+- `ORDER BY time_attr [asc|desc]`: Specifies the ordering column, it must be a 
[time attribute](streaming/time_attributes.html). Currently only support 
[proctime attribute](streaming/time_attributes.html#processing-time). [Rowtime 
atttribute](streaming/time_attributes.html#event-time) will be supported in the 
future. Ordering by ASC means keeping the first row, ordering by DESC means 
keeping the last row.
+- `WHERE rownum = 1`: The `rownum = 1` is required for Flink to recognize this 
query is deduplication.
+
+The following examples show how to specify SQL queries with Deduplication on 
streaming tables.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// ingest a DataStream from an external source
+DataStream<Tuple3<String, String, String, Integer>> ds = env.addSource(...);
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, 
proctime.proctime");
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+Table result1 = tableEnv.sqlQuery(
+  "SELECT order_id, user, product, number " +
+  "FROM (" +
+  "   SELECT *," +
+  "       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as 
row_num" +
+  "   FROM Orders)" +
+  "WHERE row_num = 1");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
+// register the DataStream under the name "Orders"
+tableEnv.registerDataStream("Orders", ds, 'order_id, 'user, 'product, 'number, 
'proctime.proctime)
+
+// remove duplicate rows on order_id and keep the first occurrence row,
+// because there shouldn't be two orders with the same order_id.
+val result1 = tableEnv.sqlQuery(
+    """
+      |SELECT order_id, user, product, number
+      |FROM (
+      |   SELECT *,
+      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) 
as row_num
+      |   FROM Orders)
+      |WHERE row_num = 1
+    """.stripMargin)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 #### Insert
 
 <div markdown="1">

Reply via email to