This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 01036af KYLIN-4905 Support limit .. offset ... in spark query engine 01036af is described below commit 01036afd898e313a7d3b31f326dc780fd47f8fd0 Author: zhengshengjun <shengjun_zh...@sina.com> AuthorDate: Fri Feb 19 08:11:09 2021 +0800 KYLIN-4905 Support limit .. offset ... in spark query engine --- .../src/test/resources/query/sql_limit/query00.sql | 24 ++++++++++++++++++++ .../sql_limit/query00.sql.expected/._SUCCESS.crc | Bin 0 -> 8 bytes ...90bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc | Bin 0 -> 12 bytes .../query/sql_limit/query00.sql.expected/_SUCCESS | 0 ...0-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv | 6 +++++ .../kylin/query/runtime/plans/LimitPlan.scala | 2 ++ .../kylin/engine/spark2/NBuildAndQueryTest.java | 1 + .../org/apache/kylin/query/exec/SparkExec.java | 18 +++++++++++++-- .../apache/kylin/query/relnode/OLAPTableScan.java | 25 +++++++++++++++++++++ 9 files changed, 74 insertions(+), 2 deletions(-) diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql b/kylin-it/src/test/resources/query/sql_limit/query00.sql new file mode 100644 index 0000000..da7af83 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_limit/query00.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select slr_segment_cd, sum(price) +from test_kylin_fact +group by slr_segment_cd +order by sum(price) desc +limit 4 offset 2 +;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":14336} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/._SUCCESS.crc differ diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc new file mode 100644 index 0000000..8134a78 Binary files /dev/null and b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/.part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv.crc differ diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/_SUCCESS b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv new file mode 100644 index 0000000..cc69342 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_limit/query00.sql.expected/part-00000-a290bf53-2025-46e5-8133-2a6bf503206c-c000.csv @@ -0,0 +1,6 @@ +13,597780.2600 +11,592133.2300 +12,570202.3700 +14,570158.2000 +5,567907.4600 +16,556006.0200 diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala index beb4e91..f5ff285 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/LimitPlan.scala @@ -41,6 +41,8 @@ object LimitPlan { val offset = BigDecimal(rel.localOffset.accept(visitor).toString).toInt inputs .get(0) + //TODO KYLIN-4905 currently spark doesn't support limit...offset, support this in kylin server side + .limit(offset + limit) //.limitRange(offset, offset + limit) } } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java index 537980f..dc84886 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java @@ -200,6 +200,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest { tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_unionall")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_values")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_window")); + tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_limit")); } logger.info("Total {} tasks.", tasks.size()); return tasks; diff --git a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java index 03c5837..8571ab1 100644 --- a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java +++ b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java @@ -22,8 +22,10 @@ import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.query.relnode.OLAPLimitRel; import org.apache.kylin.query.relnode.OLAPRel; public class SparkExec { @@ -37,7 +39,13 @@ public class SparkExec { RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType(); try { Enumerable<Object[]> computer = QueryEngineFactory.compute(dataContext, olapRel, rowType); - return computer; + //TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side + if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) { + RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset; + return computer.skip(Integer.valueOf(literal.getValue().toString())); + } else { + return computer; + } } catch (Exception e) { throw new RuntimeException(e); } @@ -52,7 +60,13 @@ public class SparkExec { RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType(); try { Enumerable<Object> objects = QueryEngineFactory.computeSCALA(dataContext, olapRel, rowType); - return objects; + //TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side + if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) { + RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset; + return objects.skip(Integer.valueOf(literal.getValue().toString())); + } else { + return objects; + } } catch (Exception e) { throw new RuntimeException(e); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java index f7a363f..bbb4042 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java @@ -62,6 +62,7 @@ import org.apache.calcite.rel.rules.ReduceExpressionsRule; import org.apache.calcite.rel.rules.SemiJoinRule; import org.apache.calcite.rel.rules.SortJoinTransposeRule; import org.apache.calcite.rel.rules.SortUnionTransposeRule; +import org.apache.calcite.rel.rules.SortProjectTransposeRule; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; @@ -211,6 +212,30 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel { // see Dec 26th email @ http://mail-archives.apache.org/mod_mbox/calcite-dev/201412.mbox/browser planner.removeRule(ExpandConversionRule.INSTANCE); + + /*** TODO KYLIN-4905 + * Spark doesn't support limit...offset.., we implement this in KYLIN query server. + * The key is to keep OLAPLimitRel always the root RelNode, then take result indexed from (offset) to (offset + limit). + * But SortProjectTransposeRule will break the key, which transpose sort and project. + * eg: select sum(price), seller_id from kylin_sales group by seller_id order by sum(price) limit 10 offset 3 + + 1. Calcite optimized plan with SortProjectTransposeRule enabled: + OLAPProjectRel + |_OLAPLimitRel (offset=3,fetch=10) + |_OLAPSortRel + |_OLAPAggregateRel + |_OLAPProjectRel + |_OLAPTableScan + + 2. Calcite optimized plan with SortProjectTransposeRule removed: + OLAPLimitRel (offset=3,fetch=10) + |_OLAPSortRel + |_ OLAPAggregateRel + |_ OLAPProjectRel + |_OLAPTableScan + + * ***/ + planner.removeRule(SortProjectTransposeRule.INSTANCE); } protected void addRules(final RelOptPlanner planner, List<String> rules) {