add digest of group scan to scan rel.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/69e5d686 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/69e5d686 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/69e5d686 Branch: refs/heads/master Commit: 69e5d68640f45f60c1b47e187731c84eb9d90775 Parents: 65b36e8 Author: Jacques Nadeau <[email protected]> Authored: Wed Jun 4 09:31:55 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 4 11:18:30 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 3 +- .../exec/planner/logical/DrillScanRel.java | 45 +++++++++++----- .../drill/exec/planner/physical/ScanPrel.java | 18 +++++-- .../planner/physical/visitor/RelUniqifier.java | 54 ++++++++++++++++++++ .../planner/sql/handlers/DefaultSqlHandler.java | 4 ++ .../exec/store/dfs/easy/EasyGroupScan.java | 2 +- .../exec/store/direct/DirectGroupScan.java | 2 +- .../exec/store/ischema/InfoSchemaGroupScan.java | 2 +- .../exec/store/parquet/ParquetGroupScan.java | 3 +- .../drill/exec/store/sys/SystemTableScan.java | 2 +- .../java/org/apache/drill/BaseTestQuery.java | 2 +- .../java/org/apache/drill/TestBugFixes.java | 51 ++++++++++++++++++ 12 files changed, 161 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index f3ff64c..c7187ba 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -360,8 +360,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst @JsonIgnore public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - //TODO return copy of self - return this; + return new HBaseGroupScan(this); } @JsonIgnore http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java index ae11564..586b0ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -18,19 +18,24 @@ package org.apache.drill.exec.planner.logical; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Scan; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.planner.common.DrillScanRelBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.RelWriter; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelOptCost; import org.eigenbase.relopt.RelOptPlanner; @@ -61,17 +66,25 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { try { if (columns == null || columns.isEmpty()) { - this.groupScan = this.drillTable.getGroupScan(); + this.groupScan = (GroupScan) getCopy(this.drillTable.getGroupScan()) ; } else { this.groupScan = this.drillTable.getGroupScan().clone(columns); } } catch (IOException e) { - this.groupScan = null; - e.printStackTrace(); + throw new DrillRuntimeException("Failure creating scan.", e); } } + private static GroupScan getCopy(GroupScan scan){ + try { + return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList()); + } catch (ExecutionSetupException e) { + throw new DrillRuntimeException("Unexpected failure while coping node.", e); + } + } + + @Override public LogicalOperator implement(DrillImplementor implementor) { Scan.Builder builder = Scan.builder(); @@ -91,6 +104,10 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { return this.rowType; } + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).item("groupscan", groupScan.getDigest()); + } @Override public double getRows() { @@ -103,27 +120,27 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { Size scanSize = this.groupScan.getSize(); - int columnCount = this.getRowType().getFieldCount(); - + int columnCount = this.getRowType().getFieldCount(); + if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { OperatorCost scanCost = this.groupScan.getCost(); return planner.getCostFactory().makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(), scanCost.getDisk()); } - + // double rowCount = RelMetadataQuery.getRowCount(this); double rowCount = scanSize.getRecordCount(); - - double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. + + double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. // Even though scan is reading from disk, in the currently generated plans all plans will - // need to read the same amount of data, so keeping the disk io cost 0 is ok for now. - // In the future we might consider alternative scans that go against projections or + // need to read the same amount of data, so keeping the disk io cost 0 is ok for now. + // In the future we might consider alternative scans that go against projections or // different compression schemes etc that affect the amount of data read. Such alternatives - // would affect both cpu and io cost. + // would affect both cpu and io cost. double ioCost = 0; DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); - return costFactory.makeCost(rowCount, cpuCost, ioCost, 0); - } - + return costFactory.makeCost(rowCount, cpuCost, ioCost, 0); + } + public GroupScan getGroupScan() { return groupScan; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 445ecd5..972e47a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -22,6 +22,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -55,22 +57,30 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel { public ScanPrel(RelOptCluster cluster, RelTraitSet traits, GroupScan groupScan, RelDataType rowType) { super(cluster, traits); - this.groupScan = groupScan; + this.groupScan = getCopy(groupScan); this.rowType = rowType; } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new ScanPrel(this.getCluster(), traitSet, this.groupScan, + return new ScanPrel(this.getCluster(), traitSet, groupScan, this.rowType); } @Override protected Object clone() throws CloneNotSupportedException { - return new ScanPrel(this.getCluster(), this.getTraitSet(), this.groupScan, + return new ScanPrel(this.getCluster(), this.getTraitSet(), getCopy(groupScan), this.rowType); } + private static GroupScan getCopy(GroupScan scan){ + try { + return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList()); + } catch (ExecutionSetupException e) { + throw new DrillRuntimeException("Unexpected failure while coping node.", e); + } + } + @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { @@ -85,7 +95,7 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel { public static ScanPrel create(RelNode old, RelTraitSet traitSets, GroupScan scan, RelDataType rowType) { - return new ScanPrel(old.getCluster(), traitSets, scan, rowType); + return new ScanPrel(old.getCluster(), traitSets, getCopy(scan), rowType); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RelUniqifier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RelUniqifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RelUniqifier.java new file mode 100644 index 0000000..7b84edc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RelUniqifier.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical.visitor; + +import java.util.List; +import java.util.Set; + +import org.apache.drill.exec.planner.physical.Prel; +import org.eigenbase.rel.RelNode; + +import com.google.hive12.hive12.common.collect.Sets; +import com.google.hive12.hive12.hive12.common.collect.Lists; + +public class RelUniqifier extends BasePrelVisitor<Prel, Set<Prel>, RuntimeException>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RelUniqifier.class); + + private static final RelUniqifier INSTANCE = new RelUniqifier(); + + public static Prel uniqifyGraph(Prel p){ + Set<Prel> data = Sets.newIdentityHashSet(); + return p.accept(INSTANCE, data); + } + @Override + public Prel visitPrel(Prel prel, Set<Prel> data) throws RuntimeException { + List<RelNode> children = Lists.newArrayList(); + boolean childrenChanged = false; + for(Prel child : prel){ + Prel newChild = visitPrel(child, data); + if(newChild != child) childrenChanged = true; + children.add(newChild); + } + + if(data.contains(prel) || childrenChanged){ + return (Prel) prel.copy(prel.getTraitSet(), children); + }else{ + return prel; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 29ed1ec..883b039 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.explain.PrelSequencer; import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer; import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor; +import org.apache.drill.exec.planner.physical.visitor.RelUniqifier; import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.util.Pointer; @@ -148,6 +149,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler { // a trivial project to reorder columns prior to output. phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode); + // Make sure that the no rels are repeats. This could happen in the case of querying the same table twice as Optiq may canonicalize these. + phyRelNode = RelUniqifier.uniqifyGraph(phyRelNode); + // the last thing we do is add any required selection vector removers given the supported encodings of each // operator. This will ultimately move to a new trait but we're managing here for now to avoid introducing new // issues in planning before the next release http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index d0cd8cc..2b63601 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -161,7 +161,7 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); - return this; + return new EasyGroupScan(this); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java index eed4f03..138a024 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java @@ -72,7 +72,7 @@ public class DirectGroupScan extends AbstractGroupScan{ @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); - return this; + return new DirectSubScan(reader); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java index 5014386..7337cea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java @@ -92,7 +92,7 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { - return this; + return new InfoSchemaGroupScan (this); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index f5c1ce7..e69f61c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -385,8 +385,7 @@ public class ParquetGroupScan extends AbstractGroupScan { @JsonIgnore public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - // TODO return copy of self - return this; + return new ParquetGroupScan(this); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java index b0133f3..09aabb0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java @@ -74,7 +74,7 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{ @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { - return this; + return new SystemTableScan(table, plugin); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index a47796c..5458adc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -130,7 +130,7 @@ public class BaseTestQuery extends ExecTest{ private int testRunAndPrint(QueryType type, String query) throws Exception{ query = query.replace("[WORKING_PATH]", TestTools.getWorkingPath()); - PrintingResultsListener resultListener = new PrintingResultsListener(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); + PrintingResultsListener resultListener = new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); client.runQuery(type, query, resultListener); return resultListener.await(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/69e5d686/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java new file mode 100644 index 0000000..2aa0618 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + +import org.junit.Ignore; +import org.junit.Test; + +public class TestBugFixes extends BaseTestQuery { + + @Test + public void leak1() throws Exception { + String select = "select count(*) \n" + + " from cp.`tpch/part.parquet` p1, cp.`tpch/part.parquet` p2 \n" + + " where p1.p_name = p2.p_name \n" + + " and p1.p_mfgr = p2.p_mfgr"; + test(select); + } + + @Ignore + @Test + public void failingSmoke() throws Exception { + String select = "select count(*) \n" + + " from (select l.l_orderkey as x, c.c_custkey as y \n" + + " from cp.`tpch/lineitem.parquet` l \n" + + " left outer join cp.`tpch/customer.parquet` c \n" + + " on l.l_orderkey = c.c_custkey) as foo\n" + + " where x < 10000"; + test(select); + } + + + @Test + public void DRILL883() throws Exception { + test("select n1.n_regionkey from cp.`tpch/nation.parquet` n1, (select n_nationkey from cp.`tpch/nation.parquet`) as n2 where n1.n_nationkey = n2.n_nationkey"); + } +}
