AltSort: Replace sort with ExternalSort, Limit + sort with Limit + topN in the generated physical plan.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ecd64102 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ecd64102 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ecd64102 Branch: refs/heads/master Commit: ecd64102f7514113c22b5286e6258495b5ec2707 Parents: 4f98a4f Author: Jinfeng Ni <[email protected]> Authored: Wed Apr 9 12:33:31 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Apr 19 18:07:12 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/common/DrillLimitRelBase.java | 9 +++ .../exec/planner/logical/DrillRuleSets.java | 4 +- .../physical/DrillDistributionTrait.java | 2 +- .../physical/HashToRandomExchangePrel.java | 12 +++ .../exec/planner/physical/PushLimitToTopN.java | 52 ++++++++++++ .../physical/SingleMergeExchangePrel.java | 18 ++++- .../drill/exec/planner/physical/SortPrel.java | 10 ++- .../drill/exec/planner/physical/TopNPrel.java | 78 ++++++++++++++++++ .../org/apache/drill/TestAltSortQueries.java | 83 ++++++++++++++++++++ .../org/apache/drill/TestExampleQueries.java | 2 +- .../drill/jdbc/test/TestJdbcDistQuery.java | 2 + 11 files changed, 267 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java index c64b79b..b62eb9e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.torel.ConversionContext; import org.eigenbase.rel.InvalidRelException; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.RelWriter; import org.eigenbase.rel.SingleRel; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelTraitSet; @@ -54,4 +55,12 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod public RexNode getFetch() { return this.fetch; } + + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + pw.itemIf("offset", offset, offset != null); + pw.itemIf("fetch", fetch, fetch != null); + return pw; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 5bd8581..e5cc730 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.planner.physical.FilterPrule; import org.apache.drill.exec.planner.physical.LimitPrule; import org.apache.drill.exec.planner.physical.MergeJoinPrule; import org.apache.drill.exec.planner.physical.ProjectPrule; +import org.apache.drill.exec.planner.physical.PushLimitToTopN; import org.apache.drill.exec.planner.physical.ScanPrule; import org.apache.drill.exec.planner.physical.ScreenPrule; import org.apache.drill.exec.planner.physical.SortConvertPrule; @@ -114,7 +115,8 @@ public class DrillRuleSets { StreamAggPrule.INSTANCE, MergeJoinPrule.INSTANCE, FilterPrule.INSTANCE, - LimitPrule.INSTANCE + LimitPrule.INSTANCE, + PushLimitToTopN.INSTANCE // ExpandConversionRule.INSTANCE, // SwapJoinRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java index 018f548..8573fb2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java @@ -138,7 +138,7 @@ public class DrillDistributionTrait implements RelTrait { } public String toString() { - return new Integer(fieldId).toString(); + return String.format("[$%s]", this.fieldId); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java index c2b6c68..82f21eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java @@ -20,12 +20,15 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.List; +import net.hydromatic.linq4j.Ord; + import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.HashToRandomExchange; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.RelWriter; import org.eigenbase.rel.SingleRel; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelOptCost; @@ -74,4 +77,13 @@ public class HashToRandomExchangePrel extends SingleRel implements Prel { return this.fields; } + @Override + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + for (Ord<DistributionField> ord : Ord.zip(fields)) { + pw.item("dist" + ord.i, ord.e); + } + return pw; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java new file mode 100644 index 0000000..4c5cf33 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.physical; + +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.rex.RexLiteral; + +public class PushLimitToTopN extends RelOptRule{ + + public static final RelOptRule INSTANCE = new PushLimitToTopN(); + + private PushLimitToTopN() { + super(RelOptHelper.some(LimitPrel.class, RelOptHelper.some(SingleMergeExchangePrel.class, RelOptHelper.any(SortPrel.class))), "PushLimitToTopN"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LimitPrel limit = (LimitPrel) call.rel(0); + final SingleMergeExchangePrel smex = (SingleMergeExchangePrel) call.rel(1); + final SortPrel sort = (SortPrel) call.rel(2); + + // First offset to include into results (inclusive). Null implies it is starting from offset 0 + int offset = limit.getOffset() != null ? Math.max(0, RexLiteral.intValue(limit.getOffset())) : 0; + int fetch = limit.getFetch() != null? Math.max(0, RexLiteral.intValue(limit.getFetch())) : 0; + + final TopNPrel topN = new TopNPrel(limit.getCluster(), sort.getTraitSet(), sort.getChild(), offset + fetch, sort.getCollation()); + final LimitPrel newLimit = new LimitPrel(limit.getCluster(), limit.getTraitSet(), + new SingleMergeExchangePrel(smex.getCluster(), smex.getTraitSet(), topN, sort.getCollation()), + limit.getOffset(), limit.getFetch()); + + call.transformTo(newLimit); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java index 115d623..0d41a71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import net.hydromatic.linq4j.Ord; + import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.Order.Ordering; @@ -32,6 +34,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelFieldCollation; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.RelWriter; import org.eigenbase.rel.SingleRel; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelOptCost; @@ -76,5 +79,18 @@ public class SingleMergeExchangePrel extends SingleRel implements Prel { creator.addPhysicalOperator(g); return g; } - + + @Override + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + if (pw.nest()) { + pw.item("collation", collation); + } else { + for (Ord<RelFieldCollation> ord : Ord.zip(collation.getFieldCollations())) { + pw.item("sort" + ord.i, ord.e); + } + } + return pw; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index 06f6e8b..9c73d77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.config.SingleMergeExchange; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -27,6 +28,8 @@ import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelNode; import org.eigenbase.rel.SortRel; import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelTraitSet; import org.eigenbase.rex.RexNode; @@ -52,7 +55,7 @@ public class SortPrel extends SortRel implements Prel { throw new UnsupportedOperationException(); } - Sort g = new Sort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); + Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); creator.addPhysicalOperator(g); @@ -68,5 +71,10 @@ public class SortPrel extends SortRel implements Prel { return new SortPrel(getCluster(), traitSet, newInput, newCollation); } + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + //We use multiplier 0.05 for TopN operator, and 0.1 for Sort, to make TopN a preferred choice. + return super.computeSelfCost(planner).multiplyBy(0.1); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java new file mode 100644 index 0000000..7c811f7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.TopN; +import org.eigenbase.rel.RelCollation; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.RelWriter; +import org.eigenbase.rel.SingleRel; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; + +public class TopNPrel extends SingleRel implements Prel { + + protected int limit; + protected final RelCollation collation; + + public TopNPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, int limit, RelCollation collation) { + super(cluster, traitSet, child); + this.limit = limit; + this.collation = collation; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new TopNPrel(getCluster(), traitSet, sole(inputs), this.limit, this.collation); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getChild(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + TopN topN = new TopN(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false, this.limit); + + creator.addPhysicalOperator(topN); + + return topN; + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + //We use multiplier 0.05 for TopN operator, and 0.1 for Sort, to make TopN a preferred choice. + return super.computeSelfCost(planner).multiplyBy(0.05); + } + + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("limit", limit); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/test/java/org/apache/drill/TestAltSortQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAltSortQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestAltSortQueries.java new file mode 100644 index 0000000..0521fca --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestAltSortQueries.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + +import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.client.QuerySubmitter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +public class TestAltSortQueries { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestAltSortQueries.class); + + @Rule public TestRule TIMEOUT = TestTools.getTimeoutRule(10000000); + + @Test + public void testOrderBy() throws Exception{ + test("select R_REGIONKEY " + + "from dfs.`[WORKING_PATH]/../../sample-data/region.parquet` " + + "order by R_REGIONKEY"); + } + + @Test + public void testOrderBySingleFile() throws Exception{ + test("select R_REGIONKEY " + + "from dfs.`[WORKING_PATH]/../../sample-data/regionsSF/` " + + "order by R_REGIONKEY"); + } + + @Test + public void testSelectWithLimit() throws Exception{ + test("select employee_id, first_name, last_name from cp.`employee.json` order by employee_id limit 5 "); + } + + @Test + public void testSelectWithLimitOffset() throws Exception{ + test("select employee_id, first_name, last_name from cp.`employee.json` order by employee_id limit 5 offset 10 "); + } + + @Test + public void testJoinWithLimit() throws Exception{ + test("SELECT\n" + + " nations.N_NAME,\n" + + " regions.R_NAME\n" + + "FROM\n" + + " dfs.`[WORKING_PATH]/../../sample-data/nation.parquet` nations\n" + + "JOIN\n" + + " dfs.`[WORKING_PATH]/../../sample-data/region.parquet` regions\n" + + " on nations.N_REGIONKEY = regions.R_REGIONKEY" + + " order by regions.R_NAME, nations.N_NAME " + + " limit 5"); + } + + + private void test(String sql) throws Exception{ + boolean good = false; + sql = sql.replace("[WORKING_PATH]", TestTools.getWorkingPath()); + + try{ + QuerySubmitter s = new QuerySubmitter(); + s.submitQuery(null, sql, "sql", null, true, 1, "tsv"); + good = true; + }finally{ + if(!good) Thread.sleep(2000); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 35c4707..4f179a0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -31,7 +31,7 @@ public class TestExampleQueries { @Test public void testSelectWithLimit() throws Exception{ - test("select * from cp.`employee.json` limit 5"); + test("select employee_id, first_name, last_name from cp.`employee.json` order by employee_id limit 5 offset 10"); } @Test http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecd64102/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java index 9977285..caa388a 100644 --- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java +++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java @@ -139,6 +139,7 @@ public class TestJdbcDistQuery { } @Test + //NPE at ExternalSortBatch.java : 151 public void testSortSingleFile() throws Exception{ testQuery(String.format("select R_REGIONKEY " + "from dfs.`%s/../sample-data/regionsSF/` " @@ -146,6 +147,7 @@ public class TestJdbcDistQuery { } @Test + //NPE at ExternalSortBatch.java : 151 public void testSortMultiFile() throws Exception{ testQuery(String.format("select R_REGIONKEY " + "from dfs.`%s/../sample-data/regionsMF/` "
