[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-12 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4813

[FLINK-7821] [table] Deprecate Table.limit() and replace it by 
Table.offset() and Table.fetch().

## What is the purpose of the change

- `Table.limit(n)` has unexpected semantics of returning all but the first 
`n` rows.
- Returning the first `n` rows from a sorted result requires to specify a 0 
offset (`table.orderBy(...).limit(0, n)`) which is more complicated than 
necessary.

## Brief change log

- deprecate `Table.limit()` and replace it by `Table.offset()` and 
`Table.fetch()`

## Verifying this change

- existing tests for `limit` were adapted for `offset` and `fetch`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

- Documentation was adapted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableOffsetFetch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4813.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4813


commit 7b1e3cb03a8e863699940b152f727551c2d43ea0
Author: Fabian Hueske 
Date:   2017-10-12T08:56:19Z

[FLINK-7821] [table] Deprecate Table.limit() and replace it by 
Table.offset() and Table.fetch().




---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-12 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r144257631
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
 ---
@@ -27,10 +27,26 @@ import org.junit._
 class SortValidationTest extends TableTestBase {
--- End diff --

It had better add the following two validation tests:
```
 ds.orderBy('a.asc).offset(10).offset(10)
 ds.orderBy('a.asc).fetch(5)
```
Now` fetch` can only be used after `offset`, maybe the `fetch` be used 
after `orderBy`.  `ds.orderBy('a.asc).fetch(5)` is equivalent with 
`ds.orderBy('a.asc).offset(0)fetch(5)`


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r144268057
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
 ---
@@ -27,10 +27,26 @@ import org.junit._
 class SortValidationTest extends TableTestBase {
--- End diff --

Right, I'll add a test that validates that 
`table.orderBy('a.asc).offset(10).offset(10)` is rejected.

`table.orderBy('a.asc).fetch(5)` is already supported as you suggested (see 
the modified `SortITCase`).


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146424180
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns unlimited number of records beginning with the 4th 
record
+*   tab.orderBy('name.desc).offset(3)
+*   // return the first 5 records starting from the 10th record
+*   tab.orderBy('name.desc).offset(10).fetch(5)
+* }}}
+*
+* @param offset number of records to skip
+*/
+  def offset(offset: Int): Table = {
+new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+* Limits a sorted result to the first n rows.
+* Similar to a SQL FETCH clause. Fetch is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.fetch(n)]] can be combined with a preceding 
[[Table.offset(o)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns the first 3 records.
+*   tab.orderBy('name.desc).fetch(3)
+*   // return the first 5 records starting from the 10th record
--- End diff --

11th?


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146423916
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
--- End diff --

If I understand correctly, "from the offset position o" means "from the 
(o+1)th record", right?


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146423953
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns unlimited number of records beginning with the 4th 
record
+*   tab.orderBy('name.desc).offset(3)
+*   // return the first 5 records starting from the 10th record
--- End diff --

11th?


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146423134
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -1031,19 +1034,22 @@ val result = in.orderBy('a.asc);
 
 
   
-Limit
+Offset & Fetch
 Batch
   
   
-Similar to a SQL LIMIT clause. Limits a sorted result to a 
specified number of records from an offset position. Limit is technically part 
of the Order By operator and thus must be preceded by it.
+Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch 
limit the number of records returned from a sorted result. Offset and Fetch are 
technically part of the Order By operator and thus must be preceded by it.
 {% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3); // returns unlimited number of 
records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records 
beginning with the 4th record
+val in = ds.toTable(tableEnv, 'a, 'b, 'c)
+
+// returns the first 5 records from the sorted result
+val result1: Table = in.orderBy('a.asc).fetch(5)
+
+// returns all records beginning with the 4th record from the sorted result
+val result2: Table = in.orderBy('a.asc).offset(3)
+
+// returns the first 5 records beginning with the 10th record from the 
sorted result
--- End diff --

Should be 11th?


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146423039
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -985,19 +985,22 @@ Table result = in.orderBy("a.asc");
 
 
   
-Limit
+Offset & Fetch
 Batch
   
   
-Similar to a SQL LIMIT clause. Limits a sorted result to a 
specified number of records from an offset position. Limit is technically part 
of the Order By operator and thus must be preceded by it.
+Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch 
limit the number of records returned from a sorted result. Offset and Fetch are 
technically part of the Order By operator and thus must be preceded by it.
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number 
of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records 
beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5); 
+
+// returns all records beginning with the 4th record from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// returns the first 5 records beginning with the 10th record from the 
sorted result
--- End diff --

Should be 11th?


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146426403
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns unlimited number of records beginning with the 4th 
record
+*   tab.orderBy('name.desc).offset(3)
+*   // return the first 5 records starting from the 10th record
+*   tab.orderBy('name.desc).offset(10).fetch(5)
+* }}}
+*
+* @param offset number of records to skip
+*/
+  def offset(offset: Int): Table = {
+new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+* Limits a sorted result to the first n rows.
+* Similar to a SQL FETCH clause. Fetch is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.fetch(n)]] can be combined with a preceding 
[[Table.offset(o)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns the first 3 records.
+*   tab.orderBy('name.desc).fetch(3)
+*   // return the first 5 records starting from the 10th record
+*   tab.orderBy('name.desc).offset(10).fetch(5)
+* }}}
+*
+* @param fetch the number of records to return
+*/
+  def fetch(fetch: Int): Table = {
--- End diff --

Since `fetch: Int < 0` is allowed, maybe we should explicitly define the 
semantics of it?


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146490303
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -985,19 +985,22 @@ Table result = in.orderBy("a.asc");
 
 
   
-Limit
+Offset & Fetch
 Batch
   
   
-Similar to a SQL LIMIT clause. Limits a sorted result to a 
specified number of records from an offset position. Limit is technically part 
of the Order By operator and thus must be preceded by it.
+Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch 
limit the number of records returned from a sorted result. Offset and Fetch are 
technically part of the Order By operator and thus must be preceded by it.
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number 
of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records 
beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5); 
+
+// returns all records beginning with the 4th record from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// returns the first 5 records beginning with the 10th record from the 
sorted result
--- End diff --

No it must be 10th, because SQL indices start at 1.


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146490693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
--- End diff --

Maybe you can add a notice about the SQL indices here, as it might confuses 
users. Or you add `skips 3 rows and begins with the 4th record`.


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146489748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -726,7 +726,10 @@ class Table(
 * }}}
 *
 * @param offset number of records to skip
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "Deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
--- End diff --

This kind of deprecation does not work for Java. Please add the Java 
annotation as well.


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146540313
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
--- End diff --

will do


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146542343
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -745,12 +748,65 @@ class Table(
 *
 * @param offset number of records to skip
 * @param fetch number of records to be returned
+*
+* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] 
instead.
 */
+  @deprecated(message = "deprecated in favor of Table.offset() and 
Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
 new Table(tableEnv, Limit(offset, fetch, 
logicalPlan).validate(tableEnv))
   }
 
   /**
+* Limits a sorted result from an offset position.
+* Similar to a SQL OFFSET clause. Offset is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.offset(o)]] can be combined with a subsequent 
[[Table.fetch(n)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns unlimited number of records beginning with the 4th 
record
+*   tab.orderBy('name.desc).offset(3)
+*   // return the first 5 records starting from the 10th record
+*   tab.orderBy('name.desc).offset(10).fetch(5)
+* }}}
+*
+* @param offset number of records to skip
+*/
+  def offset(offset: Int): Table = {
+new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+* Limits a sorted result to the first n rows.
+* Similar to a SQL FETCH clause. Fetch is technically part of the 
Order By operator and
+* thus must be preceded by it.
+*
+* [[Table.fetch(n)]] can be combined with a preceding 
[[Table.offset(o)]] call to return the
+* first n rows starting from the offset position o.
+*
+* {{{
+*   // returns the first 3 records.
+*   tab.orderBy('name.desc).fetch(3)
+*   // return the first 5 records starting from the 10th record
+*   tab.orderBy('name.desc).offset(10).fetch(5)
+* }}}
+*
+* @param fetch the number of records to return
+*/
+  def fetch(fetch: Int): Table = {
--- End diff --

Good point, thanks @xccui


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4813#discussion_r146605323
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -985,19 +985,22 @@ Table result = in.orderBy("a.asc");
 
 
   
-Limit
+Offset & Fetch
 Batch
   
   
-Similar to a SQL LIMIT clause. Limits a sorted result to a 
specified number of records from an offset position. Limit is technically part 
of the Order By operator and thus must be preceded by it.
+Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch 
limit the number of records returned from a sorted result. Offset and Fetch are 
technically part of the Order By operator and thus must be preceded by it.
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number 
of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records 
beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5); 
+
+// returns all records beginning with the 4th record from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// returns the first 5 records beginning with the 10th record from the 
sorted result
--- End diff --

I think @xccui is right. An offset of 10 means that the first 10 rows are 
omitted from the result. 
Otherwise offset 1 would not offset.
I'll update the docs.


---


[GitHub] flink pull request #4813: [FLINK-7821] [table] Deprecate Table.limit() and r...

2017-10-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4813


---