[FLINK-3940] [table] Add support for ORDER BY OFFSET FETCH This closes #2282.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0472cb9b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0472cb9b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0472cb9b Branch: refs/heads/master Commit: 0472cb9ba44de7223fffbc0eca0232d558730772 Parents: dc50625 Author: gallenvara <gallenv...@126.com> Authored: Fri Jul 22 11:39:46 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Thu Aug 11 10:27:11 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 17 +++++++- .../api/table/plan/logical/operators.scala | 23 +++++++++- .../table/plan/nodes/dataset/DataSetSort.scala | 33 ++++++++++++--- .../plan/rules/dataSet/DataSetSortRule.scala | 25 ++--------- .../table/runtime/CountPartitionFunction.scala | 38 +++++++++++++++++ .../api/table/runtime/LimitFilterFunction.scala | 44 ++++++++++++++++++++ .../org/apache/flink/api/table/table.scala | 43 +++++++++++++++++++ .../flink/api/scala/batch/sql/SortITCase.scala | 34 +++++++++++---- .../api/scala/batch/table/SortITCase.scala | 36 ++++++++++++++++ .../api/scala/batch/utils/SortTestUtils.scala | 13 +++++- 10 files changed, 267 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index cb56656..6793fde 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -687,6 +687,22 @@ Table result = in.orderBy("a.asc"); </td> </tr> + <tr> + <td><strong>Limit</strong></td> + <td> + <p>Similar to a SQL LIMIT clause. Returns specified number of rows from offset position. It is technically part of the ORDER BY clause.</p> +{% highlight java %} +Table in = tableEnv.fromDataSet(ds, "a, b, c"); +Table result = in.orderBy("a.asc").limit(3); +{% endhighlight %} +or +{% highlight java %} +Table in = tableEnv.fromDataSet(ds, "a, b, c"); +Table result = in.orderBy("a.asc").limit(3, 5); +{% endhighlight %} + </td> + </tr> + </tbody> </table> @@ -1009,7 +1025,6 @@ Among others, the following SQL features are not supported, yet: - Timestamps are limited to milliseconds precision - Distinct aggregates (e.g., `COUNT(DISTINCT name)`) - Non-equi joins and Cartesian products -- Result selection by order position (`ORDER BY OFFSET FETCH`) - Grouping sets *Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.* http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index ad8618c..0d4cf2c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.logical.LogicalProject -import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject} +import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode} import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation @@ -150,6 +150,25 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode { } } +case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + child.construct(relBuilder) + relBuilder.limit(offset, fetch) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Limit on stream tables is currently not supported.") + } + if (!child.validate(tableEnv).isInstanceOf[Sort]) { + throw new TableException(s"Limit operator must follow behind orderBy clause.") + } + super.validate(tableEnv) + } +} + case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = child.output http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index 1af03d8..ef3005c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -24,10 +24,12 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.runtime.{LimitFilterFunction, CountPartitionFunction} import org.apache.flink.api.table.typeutils.TypeConverter._ import scala.collection.JavaConverters._ @@ -37,7 +39,9 @@ class DataSetSort( traitSet: RelTraitSet, inp: RelNode, collations: RelCollation, - rowType2: RelDataType) + rowType2: RelDataType, + offset: RexNode, + fetch: RexNode) extends SingleRel(cluster, traitSet, inp) with DataSetRel{ @@ -47,7 +51,9 @@ class DataSetSort( traitSet, inputs.get(0), collations, - rowType2 + rowType2, + offset, + fetch ) } @@ -71,11 +77,28 @@ class DataSetSort( partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) } + val limitedDS = if (offset == null && fetch == null) { + partitionedDs + } else { + val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0 + val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + limitStart else Int.MaxValue + + val countFunction = new CountPartitionFunction[Any] + val partitionCount = partitionedDs.mapPartition(countFunction) + + val limitFunction = new LimitFilterFunction[Any]( + limitStart, + limitEnd, + "countPartition") + partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, "countPartition") + } + + val inputType = partitionedDs.getType expectedType match { case None if config.getEfficientTypeUsage => - partitionedDs + limitedDS case _ => val determinedType = determineReturnType( @@ -96,11 +119,11 @@ class DataSetSort( getRowType.getFieldNames.asScala ) - partitionedDs.map(mapFunc) + limitedDS.map(mapFunc) } // no conversion necessary, forward else { - partitionedDs + limitedDS } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala index b26d1de..5c1fb53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala @@ -18,12 +18,10 @@ package org.apache.flink.api.table.plan.rules.dataSet -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort} -import org.apache.flink.api.table.TableException import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} class DataSetSortRule @@ -33,23 +31,6 @@ class DataSetSortRule DataSetConvention.INSTANCE, "DataSetSortRule") { - /** - * Only translate when no OFFSET or LIMIT specified - */ - override def matches(call: RelOptRuleCall): Boolean = { - val sort = call.rel(0).asInstanceOf[LogicalSort] - - if (sort.offset != null) { - throw new TableException("ORDER BY OFFSET is currently not supported.") - } - - if (sort.fetch != null) { - throw new TableException("ORDER BY FETCH is currently not supported.") - } - - sort.offset == null && sort.fetch == null - } - override def convert(rel: RelNode): RelNode = { val sort: LogicalSort = rel.asInstanceOf[LogicalSort] @@ -61,7 +42,9 @@ class DataSetSortRule traitSet, convInput, sort.getCollation, - rel.getRowType + rel.getRowType, + sort.offset, + sort.fetch ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala new file mode 100644 index 0000000..79b8623 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichMapPartitionFunction +import org.apache.flink.util.Collector + +class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Int)] { + var elementCount = 0 + + override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): Unit = { + val partitionIndex = getRuntimeContext.getIndexOfThisSubtask + val iterator = value.iterator() + while (iterator.hasNext) { + elementCount += 1 + iterator.next() + } + out.collect(partitionIndex, elementCount) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala new file mode 100644 index 0000000..311b616 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.configuration.Configuration + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +class LimitFilterFunction[T](limitStart: Int, + limitEnd: Int, + broadcast: String) extends RichFilterFunction[T] { + var elementCount = 0 + var countList = mutable.Buffer[Int]() + + override def open(config: Configuration) { + countList = getRuntimeContext.getBroadcastVariable[(Int, Int)](broadcast).asScala + .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _) + } + + override def filter(value: T): Boolean = { + val partitionIndex = getRuntimeContext.getIndexOfThisSubtask + elementCount += 1 + limitStart - countList(partitionIndex) < elementCount && + limitEnd - countList(partitionIndex) >= elementCount + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index cbb9a07..c9fd78c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -572,6 +572,49 @@ class Table( } /** + * LIMIT is called an argument since it is technically part of the ORDER BY clause. + * The statement is used to retrieve records from table and limit the number of records + * returned based on a limit value. + * + * Example: + * + * {{{ + * tab.orderBy('name.desc).limit(3) + * }}} + * + * @param offset The number of rows to skip before including them in the result. + */ + def limit(offset: Int): Table = { + if (offset < 0) { + throw new ValidationException("Offset should be greater than or equal to zero.") + } + new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv)) + } + + /** + * LIMIT is called an argument since it is technically part of the ORDER BY clause. + * The statement is used to retrieve records from table and limit the number of records + * returned based on a limit value. + * + * Example: + * + * {{{ + * tab.orderBy('name.desc).limit(3, 5) + * }}} + * + * @param offset The number of rows to skip before including them in the result. + * @param fetch The number of records returned. + */ + def limit(offset: Int, fetch: Int): Table = { + if (offset < 0 || fetch < 1) { + throw new ValidationException( + "Offset should be greater than or equal to zero and" + + " fetch should be greater than or equal to one.") + } + new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv)) + } + + /** * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. * * A batch [[Table]] can only be written to a http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala index 858f75a..7c18e14 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala @@ -60,28 +60,46 @@ class SortITCase( TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } - @Test(expected = classOf[TableException]) - def testOrderByOffset(): Unit = { + @Test + def testOrderByWithOffset(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS" + val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS" + + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + -x.productElement(0).asInstanceOf[Int]) val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) - tEnv.sql(sqlQuery).toDataSet[Row] + + val expected = sortExpectedly(tupleDataSetStrings, 2, 21) + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } - @Test(expected = classOf[TableException]) - def testOrderByFirst(): Unit = { + @Test + def testOrderByWithOffsetAndFetch(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 FETCH NEXT 2 ROWS ONLY" + val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY" + + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) - tEnv.sql(sqlQuery).toDataSet[Row] + + val expected = sortExpectedly(tupleDataSetStrings, 2, 7) + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } } http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala index 235fc45..c4a5a74 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala @@ -99,4 +99,40 @@ class SortITCase( TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } + @Test + def testOrderByOffset(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings, 3, 21) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + @Test + def testOrderByOffsetAndFetch(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3, 5) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings, 3, 8) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala index 07765fd..8d1f653 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala @@ -42,8 +42,17 @@ object SortTestUtils { ,(20, 6L, "Comment#14") ,(21, 6L, "Comment#15")) - def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = { - dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "") + def sortExpectedly(dataSet: List[Product]) + (implicit ordering: Ordering[Product]): String = + sortExpectedly(dataSet, 0, dataSet.length) + + def sortExpectedly(dataSet: List[Product], start: Int, end: Int) + (implicit ordering: Ordering[Product]): String = { + dataSet + .sorted(ordering) + .slice(start, end) + .mkString("\n") + .replaceAll("[\\(\\)]", "") } }