Awesome job on this Sudheesh. Thanks for all the hard work. Thanks also to Sean for all his work on the previous patch. ---------- Forwarded message ---------- From: <sudhe...@apache.org> Date: Mar 22, 2016 4:33 PM Subject: drill git commit: DRILL-3623: For limit 0 queries, optionally use a shorter execution path when result column types are known To: <comm...@drill.apache.org> Cc:
Repository: drill Updated Branches: refs/heads/master 600ba9ee1 -> 5dbaafbe6 DRILL-3623: For limit 0 queries, optionally use a shorter execution path when result column types are known + "planner.enable_limit0_optimization" option is disabled by default + Print plan in PlanTestBase if TEST_QUERY_PRINTING_SILENT is set + Fix DrillTestWrapper to verify expected and actual schema + Correct the schema of results in TestInbuiltHiveUDFs#testXpath_Double This closes #405 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5dbaafbe Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5dbaafbe Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5dbaafbe Branch: refs/heads/master Commit: 5dbaafbe6651b0a284fef69d5c952d82ce506e20 Parents: 600ba9e Author: Sudheesh Katkam <skat...@maprtech.com> Authored: Tue Mar 22 15:21:51 2016 -0700 Committer: Sudheesh Katkam <skat...@maprtech.com> Committed: Tue Mar 22 16:19:01 2016 -0700 ---------------------------------------------------------------------- .../drill/exec/fn/hive/TestInbuiltHiveUDFs.java | 2 +- .../org/apache/drill/exec/ExecConstants.java | 3 + .../drill/exec/physical/base/ScanStats.java | 6 +- .../apache/drill/exec/planner/PlannerPhase.java | 2 + .../planner/logical/DrillDirectScanRel.java | 70 ++ .../exec/planner/physical/DirectScanPrule.java | 49 ++ .../planner/sql/handlers/DefaultSqlHandler.java | 12 + .../planner/sql/handlers/FindLimit0Visitor.java | 124 +++- .../server/options/SystemOptionManager.java | 1 + .../exec/store/direct/DirectGroupScan.java | 27 +- .../java/org/apache/drill/DrillTestWrapper.java | 25 +- .../java/org/apache/drill/PlanTestBase.java | 9 +- .../impl/limit/TestEarlyLimit0Optimization.java | 663 +++++++++++++++++++ 13 files changed, 963 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestInbuiltHiveUDFs.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestInbuiltHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestInbuiltHiveUDFs.java index a287c89..a126aaa 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestInbuiltHiveUDFs.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestInbuiltHiveUDFs.java @@ -58,7 +58,7 @@ public class TestInbuiltHiveUDFs extends HiveTestBase { final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() .setMinorType(TypeProtos.MinorType.FLOAT8) - .setMode(TypeProtos.DataMode.REQUIRED) + .setMode(TypeProtos.DataMode.OPTIONAL) .build(); final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index b8f25ad..963934d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -202,6 +202,9 @@ public interface ExecConstants { String AFFINITY_FACTOR_KEY = "planner.affinity_factor"; OptionValidator AFFINITY_FACTOR = new DoubleValidator(AFFINITY_FACTOR_KEY, 1.2d); + String EARLY_LIMIT0_OPT_KEY = "planner.enable_limit0_optimization"; + BooleanValidator EARLY_LIMIT0_OPT = new BooleanValidator(EARLY_LIMIT0_OPT_KEY, false); + String ENABLE_MEMORY_ESTIMATION_KEY = "planner.memory.enable_memory_estimation"; OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY, false); http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java index ba36931..1886c14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java @@ -17,13 +17,13 @@ */ package org.apache.drill.exec.physical.base; - public class ScanStats { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanStats.class); - +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanStats.class); public static final ScanStats TRIVIAL_TABLE = new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1); + public static final ScanStats ZERO_RECORD_TABLE = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 0, 1, 1); + private final long recordCount; private final float cpuCost; private final float diskCost; http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 7ab7faf..57f2984 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -65,6 +65,7 @@ import org.apache.drill.exec.planner.logical.DrillWindowRule; import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule; import org.apache.drill.exec.planner.logical.partition.PruneScanRule; import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan; +import org.apache.drill.exec.planner.physical.DirectScanPrule; import org.apache.drill.exec.planner.physical.FilterPrule; import org.apache.drill.exec.planner.physical.HashAggPrule; import org.apache.drill.exec.planner.physical.HashJoinPrule; @@ -391,6 +392,7 @@ public enum PlannerPhase { ruleList.add(LimitUnionExchangeTransposeRule.INSTANCE); ruleList.add(UnionAllPrule.INSTANCE); ruleList.add(ValuesPrule.INSTANCE); + ruleList.add(DirectScanPrule.INSTANCE); if (ps.isHashAggEnabled()) { ruleList.add(HashAggPrule.INSTANCE); http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDirectScanRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDirectScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDirectScanRel.java new file mode 100644 index 0000000..013016a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDirectScanRel.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.store.direct.DirectGroupScan; + +/** + * Logical RelNode representing a {@link DirectGroupScan}. This is not backed by a {@link DrillTable}, + * unlike {@link DrillScanRel}. + */ +public class DrillDirectScanRel extends AbstractRelNode implements DrillRel { + + private final DirectGroupScan groupScan; + private final RelDataType rowType; + + public DrillDirectScanRel(RelOptCluster cluster, RelTraitSet traitSet, DirectGroupScan directGroupScan, + RelDataType rowType) { + super(cluster, traitSet); + this.groupScan = directGroupScan; + this.rowType = rowType; + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + return null; + } + + @Override + public RelDataType deriveRowType() { + return this.rowType; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).item("directscan", groupScan.getDigest()); + } + + @Override + public double getRows() { + final PlannerSettings settings = PrelUtil.getPlannerSettings(getCluster()); + return groupScan.getScanStats(settings).getRecordCount(); + } + + public DirectGroupScan getGroupScan() { + return groupScan; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DirectScanPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DirectScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DirectScanPrule.java new file mode 100644 index 0000000..5c2fd29 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DirectScanPrule.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.drill.exec.planner.logical.DrillDirectScanRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; + +public class DirectScanPrule extends Prule { + + public static final RelOptRule INSTANCE = new DirectScanPrule(); + + public DirectScanPrule() { + super(RelOptHelper.any(DrillDirectScanRel.class), "Prel.DirectScanPrule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillDirectScanRel scan = call.rel(0); + final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL); + + final ScanPrel newScan = new ScanPrel(scan.getCluster(), traits, scan.getGroupScan(), scan.getRowType()) { + // direct scan (no execution) => no accidental column shuffling => no reordering + @Override + public boolean needsFinalColumnReordering() { + return false; + } + }; + + call.transformTo(newScan); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 4ca9fe4..341bae2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -206,6 +206,18 @@ public class DefaultSqlHandler extends AbstractSqlHandler { * @throws RelConversionException */ protected DrillRel convertToDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException { + if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) && + context.getPlannerSettings().isTypeInferenceEnabled() && + FindLimit0Visitor.containsLimit0(relNode)) { + // disable distributed mode + context.getPlannerSettings().forceSingleMode(); + // if the schema is known, return the schema directly + final DrillRel shorterPlan; + if ((shorterPlan = FindLimit0Visitor.getDirectScanRelIfFullySchemaed(relNode)) != null) { + return shorterPlan; + } + } + try { final RelNode convertedRelNode; http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java index d2c5fa6..fa1fe07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java @@ -17,6 +17,10 @@ */ package org.apache.drill.exec.planner.sql.handlers; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelShuttleImpl; import org.apache.calcite.rel.logical.LogicalAggregate; @@ -25,10 +29,27 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalMinus; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.planner.logical.DrillDirectScanRel; import org.apache.drill.exec.planner.logical.DrillLimitRel; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.sql.TypeInferenceUtils; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.direct.DirectGroupScan; + +import java.util.List; /** * Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we @@ -36,16 +57,68 @@ import org.apache.drill.exec.planner.logical.DrillLimitRel; * executing a schema-only query. */ public class FindLimit0Visitor extends RelShuttleImpl { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class); + + // Some types are excluded in this set: + // + DECIMAL type is not fully supported in general. + // + VARBINARY is not fully tested. + // + MAP, ARRAY are currently not exposed to the planner. + // + TINYINT, SMALLINT are defined in the Drill type system but have been turned off for now. + // + SYMBOL, MULTISET, DISTINCT, STRUCTURED, ROW, OTHER, CURSOR, COLUMN_LIST are Calcite types + // currently not supported by Drill, nor defined in the Drill type list. + // + ANY is the late binding type. + private static final ImmutableSet<SqlTypeName> TYPES = + ImmutableSet.<SqlTypeName>builder() + .add(SqlTypeName.INTEGER, SqlTypeName.BIGINT, SqlTypeName.FLOAT, SqlTypeName.DOUBLE, + SqlTypeName.VARCHAR, SqlTypeName.BOOLEAN, SqlTypeName.DATE, SqlTypeName.TIME, + SqlTypeName.TIMESTAMP, SqlTypeName.INTERVAL_YEAR_MONTH, SqlTypeName.INTERVAL_DAY_TIME, + SqlTypeName.CHAR) + .build(); + + /** + * If all field types of the given node are {@link #TYPES recognized types} and honored by execution, then this + * method returns the tree: DrillDirectScanRel(field types). Otherwise, the method returns null. + * + * @param rel calcite logical rel tree + * @return drill logical rel tree + */ + public static DrillRel getDirectScanRelIfFullySchemaed(RelNode rel) { + final List<RelDataTypeField> fieldList = rel.getRowType().getFieldList(); + final List<SqlTypeName> columnTypes = Lists.newArrayList(); + final List<TypeProtos.DataMode> dataModes = Lists.newArrayList(); + + for (final RelDataTypeField field : fieldList) { + final SqlTypeName sqlTypeName = field.getType().getSqlTypeName(); + if (!TYPES.contains(sqlTypeName)) { + return null; + } else { + columnTypes.add(sqlTypeName); + dataModes.add(field.getType().isNullable() ? + TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED); + } + } - private boolean contains = false; + final RelTraitSet traits = rel.getTraitSet().plus(DrillRel.DRILL_LOGICAL); + final RelDataTypeReader reader = new RelDataTypeReader(rel.getRowType().getFieldNames(), columnTypes, + dataModes); + return new DrillDirectScanRel(rel.getCluster(), traits, + new DirectGroupScan(reader, ScanStats.ZERO_RECORD_TABLE), rel.getRowType()); + } + /** + * Check if the root portion of the tree contains LIMIT(0). + * + * @param rel rel node tree + * @return true if the root portion of the tree contains LIMIT(0) + */ public static boolean containsLimit0(RelNode rel) { FindLimit0Visitor visitor = new FindLimit0Visitor(); rel.accept(visitor); return visitor.isContains(); } + private boolean contains = false; + private FindLimit0Visitor() { } @@ -53,7 +126,7 @@ public class FindLimit0Visitor extends RelShuttleImpl { return contains; } - private boolean isLimit0(RexNode fetch) { + private static boolean isLimit0(RexNode fetch) { if (fetch != null && fetch.isA(SqlKind.LITERAL)) { RexLiteral l = (RexLiteral) fetch; switch (l.getTypeName()) { @@ -116,4 +189,49 @@ public class FindLimit0Visitor extends RelShuttleImpl { public RelNode visit(LogicalUnion union) { return union; } + + /** + * Reader for column names and types. + */ + public static class RelDataTypeReader extends AbstractRecordReader { + + public final List<String> columnNames; + public final List<SqlTypeName> columnTypes; + public final List<TypeProtos.DataMode> dataModes; + + public RelDataTypeReader(List<String> columnNames, List<SqlTypeName> columnTypes, + List<TypeProtos.DataMode> dataModes) { + Preconditions.checkArgument(columnNames.size() == columnTypes.size() && + columnTypes.size() == dataModes.size()); + this.columnNames = columnNames; + this.columnTypes = columnTypes; + this.dataModes = dataModes; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + for (int i = 0; i < columnNames.size(); i++) { + final TypeProtos.MajorType type = TypeProtos.MajorType.newBuilder() + .setMode(dataModes.get(i)) + .setMinorType(TypeInferenceUtils.getDrillTypeFromCalciteType(columnTypes.get(i))) + .build(); + final MaterializedField field = MaterializedField.create(columnNames.get(i), type); + final Class vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()); + try { + output.addField(field, vvClass); + } catch (SchemaChangeException e) { + throw new ExecutionSetupException(e); + } + } + } + + @Override + public int next() { + return 0; + } + + @Override + public void close() throws Exception { + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index cbc5c09..a596d3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -116,6 +116,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ExecConstants.SMALL_QUEUE_SIZE, ExecConstants.MIN_HASH_TABLE_SIZE, ExecConstants.MAX_HASH_TABLE_SIZE, + ExecConstants.EARLY_LIMIT0_OPT, ExecConstants.ENABLE_MEMORY_ESTIMATION, ExecConstants.MAX_QUERY_MEMORY_PER_NODE, ExecConstants.NON_BLOCKING_OPERATORS_MEMORY, http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java index e08fe71..a4b2fad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java @@ -17,12 +17,9 @@ */ package org.apache.drill.exec.store.direct; -import java.util.Collections; -import java.util.List; - +import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; @@ -32,14 +29,23 @@ import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.RecordReader; -public class DirectGroupScan extends AbstractGroupScan{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectGroupScan.class); +import java.util.List; + +@JsonTypeName("direct-scan") +public class DirectGroupScan extends AbstractGroupScan { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectGroupScan.class); private final RecordReader reader; + private final ScanStats stats; public DirectGroupScan(RecordReader reader) { - super((String)null); + this(reader, ScanStats.TRIVIAL_TABLE); + } + + public DirectGroupScan(RecordReader reader, ScanStats stats) { + super((String) null); this.reader = reader; + this.stats = stats; } @Override @@ -58,14 +64,15 @@ public class DirectGroupScan extends AbstractGroupScan{ return 1; } - public ScanStats getScanStats(){ - return ScanStats.TRIVIAL_TABLE; + @Override + public ScanStats getScanStats() { + return stats; } @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); - return new DirectGroupScan(reader); + return new DirectGroupScan(reader, stats); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index 67017ce..f853414 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -31,8 +31,10 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.HyperVectorValueIterator; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; @@ -294,9 +296,7 @@ public class DrillTestWrapper { protected void compareSchemaOnly() throws Exception { RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - List<QueryDataBatch> actual = Collections.EMPTY_LIST; - - + List<QueryDataBatch> actual; QueryDataBatch batch = null; try { BaseTestQuery.test(testOptionSettingQueries); @@ -305,21 +305,24 @@ public class DrillTestWrapper { loader.load(batch.getHeader().getDef(), batch.getData()); final BatchSchema schema = loader.getSchema(); - if(schema.getFieldCount() != testBuilder.getExpectedSchema().size()) { - throw new Exception("The column numbers for actual schema and expected schema do not match"); + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = testBuilder.getExpectedSchema(); + if(schema.getFieldCount() != expectedSchema.size()) { + throw new Exception("Expected and actual numbers of columns do not match."); } for(int i = 0; i < schema.getFieldCount(); ++i) { final String actualSchemaPath = schema.getColumn(i).getPath(); final TypeProtos.MajorType actualMajorType = schema.getColumn(i).getType(); - final String expectedSchemaPath = schema.getColumn(i).getPath(); - final TypeProtos.MajorType expectedlMajorType = schema.getColumn(i).getType(); + final String expectedSchemaPath = expectedSchema.get(i).getLeft().getAsUnescapedPath(); + final TypeProtos.MajorType expectedMajorType = expectedSchema.get(i).getValue(); - if(!actualSchemaPath.equalsIgnoreCase(expectedSchemaPath) - || !actualMajorType.equals(expectedlMajorType)) { - throw new Exception("The type of the " + i + "-th column is '" + actualSchemaPath + "' mismatched, expected: '" - + expectedlMajorType + "'"); + if(!actualSchemaPath.equals(expectedSchemaPath) + || !actualMajorType.equals(expectedMajorType)) { + throw new Exception(String.format("Schema path or type mismatch for column #%d:\n" + + "Expected schema path: %s\nActual schema path: %s\nExpected type: %s\nActual type: %s", + i, expectedSchemaPath, actualSchemaPath, Types.toString(expectedMajorType), + Types.toString(actualMajorType))); } } http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java index 3922a38..bb5ff88 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -291,6 +291,7 @@ public class PlanTestBase extends BaseTestQuery { final List<QueryDataBatch> results = testSqlWithResults(sql); final RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator()); final StringBuilder builder = new StringBuilder(); + final boolean silent = config != null && config.getBoolean(QueryTestUtil.TEST_QUERY_PRINTING_SILENT); for (final QueryDataBatch b : results) { if (!b.hasData()) { @@ -308,12 +309,16 @@ public class PlanTestBase extends BaseTestQuery { throw new Exception("Looks like you did not provide an explain plan query, please add EXPLAIN PLAN FOR to the beginning of your query."); } - System.out.println(vw.getValueVector().getField().getPath()); + if (!silent) { + System.out.println(vw.getValueVector().getField().getPath()); + } final ValueVector vv = vw.getValueVector(); for (int i = 0; i < vv.getAccessor().getValueCount(); i++) { final Object o = vv.getAccessor().getObject(i); builder.append(o); - System.out.println(vv.getAccessor().getObject(i)); + if (!silent) { + System.out.println(o); + } } loader.clear(); b.release(); http://git-wip-us.apache.org/repos/asf/drill/blob/5dbaafbe/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestEarlyLimit0Optimization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestEarlyLimit0Optimization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestEarlyLimit0Optimization.java new file mode 100644 index 0000000..70b0cb3 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestEarlyLimit0Optimization.java @@ -0,0 +1,663 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.limit; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.PlanTestBase; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Date; +import java.util.List; + +public class TestEarlyLimit0Optimization extends BaseTestQuery { + + private static final String viewName = "limitZeroEmployeeView"; + + private static String wrapLimit0(final String query) { + return "SELECT * FROM (" + query + ") LZT LIMIT 0"; + } + + @BeforeClass + public static void createView() throws Exception { + test("USE dfs_test.tmp"); + test(String.format("CREATE OR REPLACE VIEW %s AS SELECT " + + "CAST(employee_id AS INT) AS employee_id, " + + "CAST(full_name AS VARCHAR(25)) AS full_name, " + + "CAST(position_id AS INTEGER) AS position_id, " + + "CAST(department_id AS BIGINT) AS department_id," + + "CAST(birth_date AS DATE) AS birth_date, " + + "CAST(hire_date AS TIMESTAMP) AS hire_date, " + + "CAST(salary AS DOUBLE) AS salary, " + + "CAST(salary AS FLOAT) AS fsalary, " + + "CAST((CASE WHEN marital_status = 'S' THEN true ELSE false END) AS BOOLEAN) AS single, " + + "CAST(education_level AS VARCHAR(60)) AS education_level," + + "CAST(gender AS CHAR) AS gender " + + "FROM cp.`employee.json` " + + "ORDER BY employee_id " + + "LIMIT 1;", viewName)); + // { "employee_id":1,"full_name":"Sheri Nowmer","first_name":"Sheri","last_name":"Nowmer","position_id":1, + // "position_title":"President","store_id":0,"department_id":1,"birth_date":"1961-08-26", + // "hire_date":"1994-12-01 00:00:00.0","end_date":null,"salary":80000.0000,"supervisor_id":0, + // "education_level":"Graduate Degree","marital_status":"S","gender":"F","management_role":"Senior Management" } + } + + @AfterClass + public static void tearDownView() throws Exception { + test("DROP VIEW " + viewName + ";"); + } + + @Before + public void setOption() throws Exception { + test("SET `%s` = true;", ExecConstants.EARLY_LIMIT0_OPT_KEY); + } + + @After + public void resetOption() throws Exception { + test("RESET `%s`;", ExecConstants.EARLY_LIMIT0_OPT_KEY); + } + + // -------------------- SIMPLE QUERIES -------------------- + + @Test + public void infoSchema() throws Exception { + testBuilder() + .sqlQuery(String.format("DESCRIBE %s", viewName)) + .unOrdered() + .baselineColumns("COLUMN_NAME", "DATA_TYPE", "IS_NULLABLE") + .baselineValues("employee_id", "INTEGER", "YES") + .baselineValues("full_name", "CHARACTER VARYING", "YES") + .baselineValues("position_id", "INTEGER", "YES") + .baselineValues("department_id", "BIGINT", "YES") + .baselineValues("birth_date", "DATE", "YES") + .baselineValues("hire_date", "TIMESTAMP", "YES") + .baselineValues("salary", "DOUBLE", "YES") + .baselineValues("fsalary", "FLOAT", "YES") + .baselineValues("single", "BOOLEAN", "NO") + .baselineValues("education_level", "CHARACTER VARYING", "YES") + .baselineValues("gender", "CHARACTER", "YES") + .go(); + } + + @Test + public void simpleSelect() throws Exception { + testBuilder() + .sqlQuery(String.format("SELECT * FROM %s", viewName)) + .ordered() + .baselineColumns("employee_id", "full_name", "position_id", "department_id", "birth_date", "hire_date", + "salary", "fsalary", "single", "education_level", "gender") + .baselineValues(1, "Sheri Nowmer", 1, 1L, new DateTime(Date.valueOf("1961-08-26").getTime()), + new DateTime(Date.valueOf("1994-12-01").getTime()), 80000.0D, 80000.0F, true, "Graduate Degree", "F") + .go(); + } + + @Test + public void simpleSelectLimit0() throws Exception { + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("employee_id"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("full_name"), Types.optional(TypeProtos.MinorType.VARCHAR)), + Pair.of(SchemaPath.getSimplePath("position_id"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("department_id"), Types.optional(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("birth_date"), Types.optional(TypeProtos.MinorType.DATE)), + Pair.of(SchemaPath.getSimplePath("hire_date"), Types.optional(TypeProtos.MinorType.TIMESTAMP)), + Pair.of(SchemaPath.getSimplePath("salary"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("fsalary"), Types.optional(TypeProtos.MinorType.FLOAT4)), + Pair.of(SchemaPath.getSimplePath("single"), Types.required(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("education_level"), Types.optional(TypeProtos.MinorType.VARCHAR)), + Pair.of(SchemaPath.getSimplePath("gender"), Types.optional(TypeProtos.MinorType.VARCHAR))); + + testBuilder() + .sqlQuery(wrapLimit0(String.format("SELECT * FROM %s", viewName))) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized("SELECT * FROM " + viewName); + } + + private static void checkThatQueryPlanIsOptimized(final String query) throws Exception { + PlanTestBase.testPlanMatchingPatterns( + wrapLimit0(query), + new String[]{ + ".*Project.*\n" + + ".*Scan.*RelDataTypeReader.*" + }, + new String[]{}); + } + + // -------------------- AGGREGATE FUNC. QUERIES -------------------- + + private static String getAggQuery(final String functionName) { + return "SELECT " + + functionName + "(employee_id) AS e, " + + functionName + "(position_id) AS p, " + + functionName + "(department_id) AS d, " + + functionName + "(salary) AS s, " + + functionName + "(fsalary) AS f " + + "FROM " + viewName; + } + + @Test + public void sums() throws Exception { + final String query = getAggQuery("SUM"); + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("e"), Types.optional(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("p"), Types.optional(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("d"), Types.optional(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("s"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("f"), Types.optional(TypeProtos.MinorType.FLOAT8))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("e", "p", "d", "s", "f") + .baselineValues(1L, 1L, 1L, 80000D, 80000D) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void counts() throws Exception { + final String query = getAggQuery("COUNT"); + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("e"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("p"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("d"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("s"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("f"), Types.required(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("e", "p", "d", "s", "f") + .ordered() + .baselineValues(1L, 1L, 1L, 1L, 1L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + private void minAndMaxTest(final String functionName) throws Exception { + final String query = getAggQuery(functionName); + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("e"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("p"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("d"), Types.optional(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("s"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("f"), Types.optional(TypeProtos.MinorType.FLOAT4))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("e", "p", "d", "s", "f") + .ordered() + .baselineValues(1, 1, 1L, 80_000D, 80_000F) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void mins() throws Exception { + minAndMaxTest("MIN"); + } + + @Test + public void maxs() throws Exception { + minAndMaxTest("MAX"); + } + + @Test + public void avgs() throws Exception { + final String query = getAggQuery("AVG"); + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("e"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("p"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("d"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("s"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("f"), Types.optional(TypeProtos.MinorType.FLOAT8))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("e", "p", "d", "s", "f") + .baselineValues(1D, 1D, 1D, 80_000D, 80_000D) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void measures() throws Exception { + final String query = "SELECT " + + "STDDEV_SAMP(employee_id) AS s, " + + "STDDEV_POP(position_id) AS p, " + + "AVG(position_id) AS a, " + + "COUNT(position_id) AS c " + + "FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("s"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("p"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("a"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("c"), Types.required(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("s", "p", "a", "c") + .baselineValues(null, 0.0D, 1.0D, 1L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void nullableCount() throws Exception { + final String query = "SELECT " + + "COUNT(CASE WHEN position_id = 1 THEN NULL ELSE position_id END) AS c FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("c"), Types.required(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("c") + .baselineValues(0L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void nullableSumAndCount() throws Exception { + final String query = "SELECT " + + "COUNT(position_id) AS c, " + + "SUM(CAST((CASE WHEN position_id = 1 THEN NULL ELSE position_id END) AS INT)) AS p " + + "FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("c"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("p"), Types.optional(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("c", "p") + .baselineValues(1L, null) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void castSum() throws Exception { + final String query = "SELECT CAST(SUM(position_id) AS INT) AS s FROM cp.`employee.json`"; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("s"), Types.optional( TypeProtos.MinorType.INT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("s") + .baselineValues(18422) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void sumCast() throws Exception { + final String query = "SELECT SUM(CAST(position_id AS INT)) AS s FROM cp.`employee.json`"; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("s"), Types.optional(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("s") + .baselineValues(18422L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void sumsAndCounts1() throws Exception { + final String query = "SELECT " + + "COUNT(*) as cs, " + + "COUNT(1) as c1, " + + "COUNT(employee_id) as cc, " + + "SUM(1) as s1," + + "department_id " + + " FROM " + viewName + " GROUP BY department_id"; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("cs"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("c1"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("cc"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("s1"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("department_id"), Types.optional(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("cs", "c1", "cc", "s1", "department_id") + .baselineValues(1L, 1L, 1L, 1L, 1L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void sumsAndCounts2() throws Exception { + final String query = "SELECT " + + "SUM(1) as s1, " + + "COUNT(1) as c1, " + + "COUNT(*) as cs, " + + "COUNT(CAST(n_regionkey AS INT)) as cc " + + "FROM cp.`tpch/nation.parquet` " + + "GROUP BY CAST(n_regionkey AS INT)"; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("s1"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("c1"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("cs"), Types.required(TypeProtos.MinorType.BIGINT)), + Pair.of(SchemaPath.getSimplePath("cc"), Types.required(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("s1", "c1", "cs", "cc") + .baselineValues(5L, 5L, 5L, 5L) + .baselineValues(5L, 5L, 5L, 5L) + .baselineValues(5L, 5L, 5L, 5L) + .baselineValues(5L, 5L, 5L, 5L) + .baselineValues(5L, 5L, 5L, 5L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + + } + + @Test + public void rank() throws Exception { + final String query = "SELECT RANK() OVER(PARTITION BY employee_id ORDER BY employee_id) AS r FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("r"), Types.required(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("r") + .baselineValues(1L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + // -------------------- SCALAR FUNC. QUERIES -------------------- + + @Test + public void cast() throws Exception { + final String query = "SELECT CAST(fsalary AS DOUBLE) AS d," + + "CAST(employee_id AS BIGINT) AS e FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("d"), Types.optional(TypeProtos.MinorType.FLOAT8)), + Pair.of(SchemaPath.getSimplePath("e"), Types.optional(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("d", "e") + .ordered() + .baselineValues(80_000D, 1L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + public void concatTest(final String query) throws Exception { + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("c"), Types.optional(TypeProtos.MinorType.VARCHAR))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("c") + .ordered() + .baselineValues("Sheri NowmerGraduate Degree") + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void concat() throws Exception { + concatTest("SELECT CONCAT(full_name, education_level) AS c FROM " + viewName); + } + + @Test + public void concatOp() throws Exception { + concatTest("SELECT full_name || education_level AS c FROM " + viewName); + } + + @Test + public void extract() throws Exception { + final String query = "SELECT EXTRACT(YEAR FROM hire_date) AS e FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("e"), Types.optional(TypeProtos.MinorType.BIGINT))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("e") + .ordered() + .baselineValues(1994L) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void binary() throws Exception { + final String query = "SELECT " + + "single AND true AS b, " + + "full_name || education_level AS c, " + + "position_id / position_id AS d, " + + "position_id = position_id AS e, " + + "position_id > position_id AS g, " + + "position_id >= position_id AS ge, " + + "position_id IN (0, 1) AS i, +" + + "position_id < position_id AS l, " + + "position_id <= position_id AS le, " + + "position_id - position_id AS m, " + + "position_id * position_id AS mu, " + + "position_id <> position_id AS n, " + + "single OR false AS o, " + + "position_id + position_id AS p FROM " + viewName; + + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("b"), Types.required(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("c"), Types.optional(TypeProtos.MinorType.VARCHAR)), + Pair.of(SchemaPath.getSimplePath("d"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("e"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("g"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("ge"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("i"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("l"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("le"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("m"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("mu"), Types.optional( TypeProtos.MinorType.INT)), + Pair.of(SchemaPath.getSimplePath("n"), Types.optional(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("o"), Types.required(TypeProtos.MinorType.BIT)), + Pair.of(SchemaPath.getSimplePath("p"), Types.optional( TypeProtos.MinorType.INT))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("b", "c", "d", "e", "g", "ge", "i", "l", "le", "m", "mu", "n", "o", "p") + .ordered() + .baselineValues(true, "Sheri NowmerGraduate Degree", 1, true, false, true, true, false, true, + 0, 1, false, true, 2) + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + public void substringTest(final String query) throws Exception { + @SuppressWarnings("unchecked") + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList( + Pair.of(SchemaPath.getSimplePath("s"), Types.optional(TypeProtos.MinorType.VARCHAR))); + + testBuilder() + .sqlQuery(query) + .baselineColumns("s") + .ordered() + .baselineValues("Sheri") + .go(); + + testBuilder() + .sqlQuery(wrapLimit0(query)) + .schemaBaseLine(expectedSchema) + .go(); + + checkThatQueryPlanIsOptimized(query); + } + + @Test + public void substring() throws Exception { + substringTest("SELECT SUBSTRING(full_name, 1, 5) AS s FROM " + viewName); + } + + @Test + public void substr() throws Exception { + substringTest("SELECT SUBSTR(full_name, 1, 5) AS s FROM " + viewName); + } +}