Repository: incubator-drill Updated Branches: refs/heads/master c1c0eba5b -> 967fef5ad
DRILL-1309: Implement ProjectPastFilterPushdown and update DrillScanRel cost model so that exclusive column so that star query is more expensive than exclusive column projection. Various fixes affecting record reaaders to handle `*` column as well as fixes to some test cases. exclude parquet files from rat check Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1486947 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1486947 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1486947 Branch: refs/heads/master Commit: f148694738a84832d75aca4ef69bff47c68b463f Parents: c1c0eba Author: Hanifi Gunes <[email protected]> Authored: Thu Aug 28 10:21:07 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Fri Aug 29 00:00:41 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 15 ++-- .../exec/store/hbase/HBaseRecordReader.java | 68 +++++++------- .../exec/store/hbase/HBaseScanBatchCreator.java | 10 ++- .../drill/exec/store/hive/HiveRecordReader.java | 12 +-- .../exec/expr/fn/impl/conv/JsonConvertFrom.java | 12 ++- .../drill/exec/physical/base/GroupScan.java | 3 + .../logical/DrillPushProjectPastFilterRule.java | 94 ++++++++++++++++++++ .../exec/planner/logical/DrillRuleSets.java | 3 +- .../exec/planner/logical/DrillScanRel.java | 57 +++++++----- .../exec/planner/logical/RelOptHelper.java | 4 + .../planner/sql/handlers/ExplainHandler.java | 2 +- .../DrillParserWithCompoundIdConverter.java | 9 +- .../drill/exec/store/AbstractRecordReader.java | 62 +++++++++++++ .../drill/exec/store/AbstractStoragePlugin.java | 2 +- .../drill/exec/store/dfs/FileSystemPlugin.java | 5 -- .../exec/store/dfs/easy/EasyFormatPlugin.java | 2 +- .../exec/store/dfs/easy/EasyGroupScan.java | 69 ++++++-------- .../exec/store/easy/text/TextFormatPlugin.java | 2 +- .../exec/store/parquet/ParquetRowGroupScan.java | 18 ++-- .../store/parquet/ParquetScanBatchCreator.java | 7 +- .../columnreaders/ParquetRecordReader.java | 68 ++++++++------ .../exec/store/parquet2/DrillParquetReader.java | 15 ++-- .../exec/store/schedule/BlockMapBuilder.java | 28 +++--- .../exec/store/text/DrillTextRecordReader.java | 28 +++--- .../exec/vector/complex/fn/JsonReader.java | 24 +++-- .../vector/complex/fn/JsonReaderWithState.java | 3 +- .../resources/bootstrap-storage-plugins.json | 5 ++ .../org/apache/drill/TestProjectPushDown.java | 48 ++++++++++ .../vector/complex/writer/TestJsonReader.java | 7 +- .../test/resources/project/pushdown/empty.csv | 0 .../test/resources/project/pushdown/empty.json | 0 .../resources/project/pushdown/empty.parquet | 0 pom.xml | 1 + 33 files changed, 470 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 8e9ae18..8301de1 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -43,6 +43,8 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; import org.apache.hadoop.conf.Configuration; @@ -107,7 +109,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst this.storagePlugin = storagePlugin; this.storagePluginConfig = storagePlugin.getConfig(); this.hbaseScanSpec = scanSpec; - this.columns = columns; + this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; init(); } @@ -162,12 +164,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } private void verifyColumns() { - if (columns != null) { - for (SchemaPath column : columns) { - if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { - DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", - column.getRootSegment().getPath(), hTableDesc.getNameAsString()); - } + if (AbstractRecordReader.isStarQuery(columns)) return; + for (SchemaPath column : columns) { + if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { + DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", + column.getRootSegment().getPath(), hTableDesc.getNameAsString()); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 954cc5a..51a3151 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -18,13 +18,14 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; @@ -36,7 +37,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; @@ -53,12 +54,11 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; -public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { +public class HBaseRecordReader extends AbstractRecordReader implements DrillHBaseConstants { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); private static final int TARGET_RECORD_COUNT = 4000; - private LinkedHashSet<SchemaPath> columns; private OutputMutator outputMutator; private Map<String, MapVector> familyVectorMap; @@ -67,7 +67,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private HTable hTable; private ResultScanner resultScanner; - private String hbaseTable; + private String hbaseTableName; private Scan hbaseScan; private Configuration hbaseConf; private Result leftOver; @@ -78,23 +78,29 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException { hbaseConf = conf; - hbaseTable = subScanSpec.getTableName(); + hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName(); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); - fragmentContext=context; - boolean rowKeyOnly = true; - this.columns = Sets.newLinkedHashSet(); - if (projectedColumns != null && projectedColumns.size() != 0) { - Iterator<SchemaPath> columnIterator = projectedColumns.iterator(); - while(columnIterator.hasNext()) { - SchemaPath column = columnIterator.next(); + hbaseScan + .setFilter(subScanSpec.getScanFilter()) + .setCaching(TARGET_RECORD_COUNT); + + setColumns(projectedColumns); + } + + @Override + protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { + Set<SchemaPath> transformed = Sets.newLinkedHashSet(); + if (!isStarQuery()) { + boolean rowKeyOnly = true; + for (SchemaPath column : columns) { if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) { - this.columns.add(ROW_KEY_PATH); + transformed.add(ROW_KEY_PATH); continue; } rowKeyOnly = false; NameSegment root = column.getRootSegment(); byte[] family = root.getPath().getBytes(); - this.columns.add(SchemaPath.getSimplePath(root.getPath())); + transformed.add(SchemaPath.getSimplePath(root.getPath())); PathSegment child = root.getChild(); if (child != null && child.isNamed()) { byte[] qualifier = child.getNameSegment().getPath().getBytes(); @@ -103,23 +109,21 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { hbaseScan.addFamily(family); } } - } else { - rowKeyOnly = false; - this.columns.add(ROW_KEY_PATH); - } - - hbaseScan.setFilter(subScanSpec.getScanFilter()); - if (rowKeyOnly) { /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan * to fetch only one KV from each row. If a filter is already part of this * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL * FilterList. */ - hbaseScan.setFilter( - HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()) - ); + if (rowKeyOnly) { + hbaseScan.setFilter( + HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())); + } + } else { + transformed.add(ROW_KEY_PATH); } - hbaseScan.setCaching(TARGET_RECORD_COUNT); + + + return transformed; } public OperatorContext getOperatorContext() { @@ -138,7 +142,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { try { // Add Vectors to output in the order specified when creating reader - for (SchemaPath column : columns) { + for (SchemaPath column : getColumns()) { if (column.equals(ROW_KEY_PATH)) { MaterializedField field = MaterializedField.create(column, ROW_KEY_TYPE); rowKeyVector = outputMutator.addField(field, VarBinaryVector.class); @@ -147,9 +151,9 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } } logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", - hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), + hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - hTable = new HTable(hbaseConf, hbaseTable); + hTable = new HTable(hbaseConf, hbaseTableName); resultScanner = hTable.getScanner(hbaseScan); } catch (SchemaChangeException | IOException e) { throw new ExecutionSetupException(e); @@ -230,7 +234,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { if (allocateOnCreate) { v.allocateNew(); } - columns.add(column); + getColumns().add(column); familyVectorMap.put(familyName, v); } return v; @@ -258,7 +262,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { hTable.close(); } } catch (IOException e) { - logger.warn("Failure while closing HBase table: " + hbaseTable, e); + logger.warn("Failure while closing HBase table: " + hbaseTableName, e); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java index 661e1b4..9256157 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java @@ -20,7 +20,9 @@ package org.apache.drill.exec.store.hbase; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; @@ -36,11 +38,13 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{ public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); + List<SchemaPath> columns = null; for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ try { - readers.add( - new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, subScan.getColumns(), context) - ); + if ((columns = subScan.getColumns())==null) { + columns = GroupScan.ALL_COLUMNS; + } + readers.add(new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, columns, context)); } catch (Exception e1) { throw new ExecutionSetupException(e1); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 36e55d9..1f3b3cd 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.BitVector; @@ -78,13 +79,12 @@ import org.joda.time.DateTimeZone; import com.google.common.collect.Lists; -public class HiveRecordReader implements RecordReader { +public class HiveRecordReader extends AbstractRecordReader { protected Table table; protected Partition partition; protected InputSplit inputSplit; protected FragmentContext context; - protected List<SchemaPath> projectedColumns; protected List<String> selectedColumnNames; protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList(); protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList(); @@ -115,10 +115,10 @@ public class HiveRecordReader implements RecordReader { this.partition = partition; this.inputSplit = inputSplit; this.context = context; - this.projectedColumns = projectedColumns; this.empty = (inputSplit == null && partition == null); this.hiveConfigOverride = hiveConfigOverride; this.fragmentContext=context; + setColumns(projectedColumns); init(); } @@ -171,14 +171,14 @@ public class HiveRecordReader implements RecordReader { } sInspector = (StructObjectInspector) oi; StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector); - if (projectedColumns == null) { + if (isStarQuery()) { selectedColumnNames = sTypeInfo.getAllStructFieldNames(); tableColumns = selectedColumnNames; } else { tableColumns = sTypeInfo.getAllStructFieldNames(); List<Integer> columnIds = Lists.newArrayList(); selectedColumnNames = Lists.newArrayList(); - for (SchemaPath field : projectedColumns) { + for (SchemaPath field : getColumns()) { String columnName = field.getRootSegment().getPath(); if (!tableColumns.contains(columnName)) { if (partitionNames.contains(columnName)) { @@ -204,7 +204,7 @@ public class HiveRecordReader implements RecordReader { selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo)); } - if (projectedColumns == null) { + if (isStarQuery()) { selectedPartitionNames = partitionNames; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 72bec3b..62d526b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -67,12 +67,14 @@ public class JsonConvertFrom { String input = new String(buf, com.google.common.base.Charsets.UTF_8); try { - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false); + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false); jsonReader.write(new java.io.StringReader(input), writer); } catch (Exception e) { - System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); +// System.out.println("Error while converting from JSON. "); +// e.printStackTrace(); + throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } } } @@ -94,12 +96,14 @@ public class JsonConvertFrom { String input = new String(buf, com.google.common.base.Charsets.UTF_8); try { - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false); + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false); jsonReader.write(new java.io.StringReader(input), writer); } catch (Exception e) { - System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); +// System.out.println("Error while converting from JSON. "); +// e.printStackTrace(); + throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index fa20a90..9c27c0c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base; import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -32,6 +33,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; */ public interface GroupScan extends Scan, HasAffinity{ + public static final List<SchemaPath> ALL_COLUMNS = Lists.<SchemaPath>newArrayList(SchemaPath.getSimplePath("*")); + public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException; public abstract SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java new file mode 100644 index 0000000..dcec68a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.FilterRel; +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.rules.PushProjector; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexOver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DrillPushProjectPastFilterRule extends RelOptRule { + + private final static Logger logger = LoggerFactory.getLogger(DrillPushProjectPastFilterRule.class); + + public final static RelOptRule INSTANCE = new DrillPushProjectPastFilterRule(new PushProjector.ExprCondition() { + @Override + public boolean test(RexNode expr) { + if (expr instanceof RexCall) { + RexCall call = (RexCall)expr; + return "ITEM".equals(call.getOperator().getName()); + } + return false; + } + }); + + /** + * Expressions that should be preserved in the projection + */ + private final PushProjector.ExprCondition preserveExprCondition; + + private DrillPushProjectPastFilterRule(PushProjector.ExprCondition preserveExprCondition) { + super(RelOptHelper.any(ProjectRel.class, FilterRel.class)); + this.preserveExprCondition = preserveExprCondition; + } + + @Override + public void onMatch(RelOptRuleCall call) { + ProjectRel origProj; + FilterRel filterRel; + + if (call.rels.length == 2) { + origProj = call.rel(0); + filterRel = call.rel(1); + } else { + origProj = null; + filterRel = call.rel(0); + } + RelNode rel = filterRel.getChild(); + RexNode origFilter = filterRel.getCondition(); + + if ((origProj != null) && RexOver.containsOver(origProj.getProjects(), null)) { + // Cannot push project through filter if project contains a windowed + // aggregate -- it will affect row counts. Abort this rule + // invocation; pushdown will be considered after the windowed + // aggregate has been implemented. It's OK if the filter contains a + // windowed aggregate. + return; + } + + PushProjector pushProjector = createPushProjector(origProj, origFilter, rel, preserveExprCondition); + RelNode topProject = pushProjector.convertProject(null); + + if (topProject != null) { + call.transformTo(topProject); + } + } + + protected PushProjector createPushProjector(ProjectRel origProj, RexNode origFilter, RelNode rel, + PushProjector.ExprCondition preserveExprCondition) { + return new PushProjector(origProj, origFilter,rel, preserveExprCondition); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 63de69c..cf92121 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -89,7 +89,8 @@ public class DrillRuleSets { RemoveDistinctAggregateRule.INSTANCE, // ReduceAggregatesRule.INSTANCE, // PushProjectPastJoinRule.INSTANCE, - PushProjectPastFilterRule.INSTANCE, +// PushProjectPastFilterRule.INSTANCE, + DrillPushProjectPastFilterRule.INSTANCE, // SwapJoinRule.INSTANCE, // // PushJoinThroughJoinRule.RIGHT, // // PushJoinThroughJoinRule.LEFT, // http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java index dcbfb3d..d6bbcd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -21,12 +21,16 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; @@ -46,6 +50,8 @@ import org.eigenbase.reltype.RelDataType; * GroupScan of a Drill table. */ public class DrillScanRel extends DrillScanRelBase implements DrillRel { + private final static int STAR_COLUMN_COST = 10000; + final private RelDataType rowType; private GroupScan groupScan; @@ -54,7 +60,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { RelOptTable table) { // By default, scan does not support project pushdown. // Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule. - this(cluster, traits, table, table.getRowType(), null); + this(cluster, traits, table, table.getRowType(), AbstractGroupScan.ALL_COLUMNS); } /** Creates a DrillScan. */ @@ -62,26 +68,21 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { RelOptTable table, RelDataType rowType, List<SchemaPath> columns) { super(DRILL_LOGICAL, cluster, traits, table); this.rowType = rowType; - + columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns; try { - if (columns == null || columns.isEmpty()) { - this.groupScan = (GroupScan) getCopy(this.drillTable.getGroupScan()) ; - } else { - this.groupScan = this.drillTable.getGroupScan().clone(columns); - } + this.groupScan = drillTable.getGroupScan().clone(columns); } catch (IOException e) { throw new DrillRuntimeException("Failure creating scan.", e); } - - } - - private static GroupScan getCopy(GroupScan scan){ - try { - return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList()); - } catch (ExecutionSetupException e) { - throw new DrillRuntimeException("Unexpected failure while coping node.", e); - } } +// +// private static GroupScan getCopy(GroupScan scan){ +// try { +// return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList()); +// } catch (ExecutionSetupException e) { +// throw new DrillRuntimeException("Unexpected failure while coping node.", e); +// } +// } @Override @@ -118,15 +119,29 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { /// by both logical and physical rels. @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - ScanStats stats = this.groupScan.getScanStats(); - int columnCount = this.getRowType().getFieldCount(); + ScanStats stats = groupScan.getScanStats(); + int columnCount = getRowType().getFieldCount(); + double ioCost = 0; + boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() { + @Override + public boolean apply(String input) { + return Preconditions.checkNotNull(input).equals("*"); + } + }).isPresent(); - if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { - return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost()); + if (isStarQuery) { + columnCount = STAR_COLUMN_COST; } // double rowCount = RelMetadataQuery.getRowCount(this); double rowCount = stats.getRecordCount(); + if (rowCount < 1) { + rowCount = 1; + } + + if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { + return planner.getCostFactory().makeCost(rowCount * columnCount, stats.getCpuCost(), stats.getDiskCost()); + } double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. // Even though scan is reading from disk, in the currently generated plans all plans will @@ -134,7 +149,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { // In the future we might consider alternative scans that go against projections or // different compression schemes etc that affect the amount of data read. Such alternatives // would affect both cpu and io cost. - double ioCost = 0; + DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); return costFactory.makeCost(rowCount, cpuCost, ioCost, 0); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java index 92272f8..2e253ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java @@ -32,6 +32,10 @@ public class RelOptHelper { public static RelOptRuleOperand any(Class<? extends RelNode> first){ return RelOptRule.operand(first, RelOptRule.any()); } + + public static RelOptRuleOperand any(Class<? extends RelNode> first, Class<? extends RelNode> second) { + return RelOptRule.operand(first, RelOptRule.operand(second, RelOptRule.any())); + } public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){ return RelOptRule.operand(rel, RelOptRule.some(first, rest)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java index 63db153..25fa0cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java @@ -55,8 +55,8 @@ public class ExplainHandler extends DefaultSqlHandler{ SqlNode sqlNode = rewrite(node); SqlNode validated = validateNode(sqlNode); RelNode rel = convertToRel(validated); - DrillRel drel = convertToDrel(rel); log("Optiq Logical", rel); + DrillRel drel = convertToDrel(rel); log("Drill Logical", drel); if(mode == ResultMode.LOGICAL){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java index 4886741..382456a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.planner.sql.parser.CompoundIdentifierConverter; import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.parser.SqlAbstractParserImpl; import org.eigenbase.sql.parser.SqlParserImplFactory; +import org.eigenbase.sql.util.SqlVisitor; import java.io.Reader; @@ -39,15 +40,19 @@ public class DrillParserWithCompoundIdConverter extends DrillParserImpl { super(stream); } + protected SqlVisitor<SqlNode> createConverter() { + return new CompoundIdentifierConverter(); + } + @Override public SqlNode parseSqlExpressionEof() throws Exception { SqlNode originalSqlNode = super.parseSqlExpressionEof(); - return originalSqlNode.accept(new CompoundIdentifierConverter()); + return originalSqlNode.accept(createConverter()); } @Override public SqlNode parseSqlStmtEof() throws Exception { SqlNode originalSqlNode = super.parseSqlStmtEof(); - return originalSqlNode.accept(new CompoundIdentifierConverter()); + return originalSqlNode.accept(createConverter()); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java new file mode 100644 index 0000000..4cc06c8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.util.Collection; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.drill.common.expression.SchemaPath; + +public abstract class AbstractRecordReader implements RecordReader { + private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields."; + private static final String COL_EMPTY_ERROR = "Readers needs at least a column to read."; + public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*"); + + private Collection<SchemaPath> columns = null; + private boolean isStarQuery = false; + + protected final void setColumns(Collection<SchemaPath> projected) { + assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR; + isStarQuery = isStarQuery(projected); + columns = transformColumns(projected); + } + + protected Collection<SchemaPath> getColumns() { + return columns; + } + + protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) { + return projected; + } + + protected boolean isStarQuery() { + return isStarQuery; + } + + public static boolean isStarQuery(Collection<SchemaPath> projected) { + return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() { + @Override + public boolean apply(SchemaPath path) { + return Preconditions.checkNotNull(path).equals(STAR_COLUMN); + } + }).isPresent(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index b328245..9cdfe24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -50,7 +50,7 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{ @Override public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { - return getPhysicalScan(selection, null); + return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index 2f0e854..ec9a04e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -109,11 +109,6 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ } @Override - public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { - return this.getPhysicalScan(selection, null); - } - - @Override public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class); FormatPlugin plugin; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index bf8e301..d1923a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -186,7 +186,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements @Override public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException { - return new EasyGroupScan(selection, this, null, selection.selectionRoot); + return new EasyGroupScan(selection, this, selection.selectionRoot); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 3de99c5..2bdf1a6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; -import java.util.Collections; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -72,21 +71,15 @@ public class EasyGroupScan extends AbstractGroupScan{ @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("selectionRoot") String selectionRoot ) throws IOException, ExecutionSetupException { + this(new FileSelection(files, true), + (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig), + columns, + selectionRoot); + } - this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); - Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); - this.selection = new FileSelection(files, true); - try{ - BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); - }catch(IOException e){ - logger.warn("Failure determining endpoint affinity.", e); - this.endpointAffinities = Collections.emptyList(); - } - maxWidth = chunks.size(); - this.columns = columns; - this.selectionRoot = selectionRoot; + public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot) + throws IOException { + this(selection, formatPlugin, ALL_COLUMNS, selectionRoot); } public EasyGroupScan( @@ -95,30 +88,26 @@ public class EasyGroupScan extends AbstractGroupScan{ List<SchemaPath> columns, String selectionRoot ) throws IOException{ - this.selection = selection; - this.formatPlugin = formatPlugin; - this.columns = columns; - try{ - BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); - }catch(IOException e){ - logger.warn("Failure determining endpoint affinity.", e); - this.endpointAffinities = Collections.emptyList(); - } - maxWidth = chunks.size(); + this.selection = Preconditions.checkNotNull(selection); + this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); + this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; this.selectionRoot = selectionRoot; + BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); + this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); + this.maxWidth = chunks.size(); + this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); } private EasyGroupScan(EasyGroupScan that) { - this.chunks = that.chunks; - this.columns = that.columns; - this.endpointAffinities = that.endpointAffinities; - this.formatPlugin = that.formatPlugin; - this.mappings = that.mappings; - this.maxWidth = that.maxWidth; - this.selection = that.selection; - this.selectionRoot = that.selectionRoot; + Preconditions.checkNotNull(that, "Unable to clone: source is null."); + selection = that.selection; + formatPlugin = that.formatPlugin; + columns = that.columns; + selectionRoot = that.selectionRoot; + chunks = that.chunks; + endpointAffinities = that.endpointAffinities; + maxWidth = that.maxWidth; + mappings = that.mappings; } public String getSelectionRoot() { @@ -168,16 +157,16 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public List<EndpointAffinity> getOperatorAffinity() { assert chunks != null && chunks.size() > 0; - if (this.endpointAffinities == null) { + if (endpointAffinities == null) { logger.debug("chunks: {}", chunks.size()); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); + endpointAffinities = AffinityCreator.getAffinityMap(chunks); } - return this.endpointAffinities; + return endpointAffinities; } @Override public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { - this.mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks); + mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks); } @Override @@ -232,7 +221,7 @@ public class EasyGroupScan extends AbstractGroupScan{ @JsonIgnore public boolean canPushdownProjects(List<SchemaPath> columns) { - return this.formatPlugin.supportsPushDown(); + return formatPlugin.supportsPushDown(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 81a23b0..6e1aa0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -72,7 +72,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm @Override public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException { - return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project? + return new EasyGroupScan(selection, this, columns, selection.selectionRoot); //TODO : textformat supports project? } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java index d1a086c..fd40f41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java @@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; @@ -60,16 +61,9 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan { @JsonProperty("columns") List<SchemaPath> columns, // @JsonProperty("selectionRoot") String selectionRoot // ) throws ExecutionSetupException { - - if(formatConfig == null) formatConfig = new ParquetFormatConfig(); - Preconditions.checkNotNull(storageConfig); - Preconditions.checkNotNull(formatConfig); - this.formatPlugin = (ParquetFormatPlugin) registry.getFormatPlugin(storageConfig, formatConfig); - Preconditions.checkNotNull(formatPlugin); - this.rowGroupReadEntries = rowGroupReadEntries; - this.formatConfig = formatPlugin.getConfig(); - this.columns = columns; - this.selectionRoot = selectionRoot; + this((ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), + formatConfig == null ? new ParquetFormatConfig() : formatConfig), + rowGroupReadEntries, columns, selectionRoot); } public ParquetRowGroupScan( // @@ -77,10 +71,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan { List<RowGroupReadEntry> rowGroupReadEntries, // List<SchemaPath> columns, String selectionRoot) { - this.formatPlugin = formatPlugin; + this.formatPlugin = Preconditions.checkNotNull(formatPlugin); this.formatConfig = formatPlugin.getConfig(); this.rowGroupReadEntries = rowGroupReadEntries; - this.columns = columns; + this.columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns; this.selectionRoot = selectionRoot; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index a453e66..f9b6d91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; @@ -65,11 +66,9 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan List<String[]> partitionColumns = Lists.newArrayList(); List<Integer> selectedPartitionColumns = Lists.newArrayList(); - boolean selectAllColumns = false; + boolean selectAllColumns = AbstractRecordReader.isStarQuery(columns); - if (columns == null || columns.size() == 0) { - selectAllColumns = true; - } else { + if (!selectAllColumns) { List<SchemaPath> newColums = Lists.newArrayList(); Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); for (SchemaPath column : columns) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 34e7aea..6c2d44c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet.columnreaders; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -33,8 +38,10 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.ValueVector; @@ -53,7 +60,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; -public class ParquetRecordReader implements RecordReader { +public class ParquetRecordReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class); // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors @@ -75,8 +82,8 @@ public class ParquetRecordReader implements RecordReader { private int bitWidthAllFixedFields; private boolean allFieldsFixedLength; private int recordsPerBatch; - private long totalRecords; - private long rowGroupOffset; +// private long totalRecords; +// private long rowGroupOffset; private List<ColumnReader> columnStatuses; private FileSystem fileSystem; @@ -84,8 +91,6 @@ public class ParquetRecordReader implements RecordReader { Path hadoopPath; private VarLenBinaryReader varLengthReader; private ParquetMetadata footer; - private List<SchemaPath> columns; - private FragmentContext fragmentContext; private OperatorContext operatorContext; // This is a parallel list to the columns list above, it is used to determine the subset of the project // pushdown columns that do not appear in this file @@ -117,17 +122,13 @@ public class ParquetRecordReader implements RecordReader { String path, int rowGroupIndex, FileSystem fs, CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, List<SchemaPath> columns) throws ExecutionSetupException { - hadoopPath = new Path(path); - fileSystem = fs; + this.hadoopPath = new Path(path); + this.fileSystem = fs; this.codecFactoryExposer = codecFactoryExposer; this.rowGroupIndex = rowGroupIndex; this.batchSize = batchSize; this.footer = footer; - this.columns = columns; - if (this.columns != null) { - columnsFound = new boolean[this.columns.size()]; - nullFilledVectors = new ArrayList(); - } + setColumns(columns); } public CodecFactoryExposer getCodecFactoryExposer() { @@ -184,25 +185,29 @@ public class ParquetRecordReader implements RecordReader { // TODO - not sure if this is how we want to represent this // for now it makes the existing tests pass, simply selecting // all available data if no columns are provided - if (this.columns != null){ - int i = 0; - for (SchemaPath expr : this.columns){ - if ( field.matches(expr)){ - columnsFound[i] = true; - return true; - } - i++; + if (isStarQuery()) { + return true; + } + + int i = 0; + for (SchemaPath expr : getColumns()){ + if ( field.matches(expr)){ + columnsFound[i] = true; + return true; } - return false; + i++; } - return true; + return false; } @Override public void setup(OutputMutator output) throws ExecutionSetupException { - + if (!isStarQuery()) { + columnsFound = new boolean[getColumns().size()]; + nullFilledVectors = new ArrayList(); + } columnStatuses = new ArrayList<>(); - totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount(); +// totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount(); List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns(); allFieldsFixedLength = true; ColumnDescriptor column; @@ -211,7 +216,7 @@ public class ParquetRecordReader implements RecordReader { mockRecordsRead = 0; MaterializedField field; - ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); +// ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); FileMetaData fileMetaData; // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below @@ -249,7 +254,7 @@ public class ParquetRecordReader implements RecordReader { allFieldsFixedLength = false; } } - rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); +// rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); if (columnsToScan != 0 && allFieldsFixedLength) { recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, @@ -294,10 +299,15 @@ public class ParquetRecordReader implements RecordReader { } varLengthReader = new VarLenBinaryReader(this, varLengthColumns); - if (this.columns != null) { + if (!isStarQuery()) { + List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns()); + SchemaPath col; for (int i = 0; i < columnsFound.length; i++) { - if ( ! columnsFound[i]) { - nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(this.columns.get(i), Types.optional(TypeProtos.MinorType.BIT)), + col = projectedColumns.get(i); + assert col!=null; + if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) { + nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(col, + Types.optional(TypeProtos.MinorType.BIT)), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT, DataMode.OPTIONAL))); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 16f520c..7a864f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -18,12 +18,15 @@ package org.apache.drill.exec.store.parquet2; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.BaseValueVector; @@ -48,11 +51,12 @@ import parquet.schema.Type; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -public class DrillParquetReader implements RecordReader { +public class DrillParquetReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class); @@ -60,7 +64,6 @@ public class DrillParquetReader implements RecordReader { private MessageType schema; private Configuration conf; private RowGroupReadEntry entry; - private List<SchemaPath> columns; private VectorContainerWriter writer; private ColumnChunkIncReadStore pageReadStore; private parquet.io.RecordReader<Void> recordReader; @@ -73,11 +76,11 @@ public class DrillParquetReader implements RecordReader { public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) { this.footer = footer; this.conf = conf; - this.columns = columns; this.entry = entry; + setColumns(columns); } - public static MessageType getProjection(MessageType schema, List<SchemaPath> columns) { + public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) { MessageType projection = null; for (SchemaPath path : columns) { List<String> segments = Lists.newArrayList(); @@ -117,10 +120,10 @@ public class DrillParquetReader implements RecordReader { schema = footer.getFileMetaData().getSchema(); MessageType projection = null; - if (columns == null || columns.size() == 0) { + if (isStarQuery()) { projection = schema; } else { - projection = getProjection(schema, columns); + projection = getProjection(schema, getColumns()); if (projection == null) { projection = schema; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java index f27e8e6..3c5d9a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java @@ -64,17 +64,25 @@ public class BlockMapBuilder { return codecFactory.getCodec(fileStatus.getPath()) != null; } - public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException{ + public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException { List<CompleteFileWork> work = Lists.newArrayList(); - for(FileStatus f : files){ - ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f); - if(!blockify || compressed(f)){ - work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString())); - continue; + boolean error = false; + for(FileStatus f : files) { + error = false; + if (blockify && !compressed(f)) { + try { + ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(f); + for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) { + work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString())); + } + } catch (IOException e) { + logger.warn("failure while generating file work.", e); + error = true; + } } - - for(Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()){ - work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString())); + + if (!blockify || error || compressed(f)) { + work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString())); } } return work; @@ -135,7 +143,7 @@ public class BlockMapBuilder { private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws IOException{ ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(path); - if(blockMap == null){ + if(blockMap == null) { blockMap = buildBlockMap(path); } return blockMap; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 68921a2..2031aee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; @@ -34,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.RepeatedVarCharVector; @@ -48,7 +51,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class DrillTextRecordReader implements RecordReader { +public class DrillTextRecordReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class); static final String COL_NAME = "columns"; @@ -66,36 +69,31 @@ public class DrillTextRecordReader implements RecordReader { private Text value; private int numCols = 0; private boolean redoRecord = false; - private boolean first = true; public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) { this.fragmentContext = context; this.delimiter = (byte) delimiter; - boolean getEntireRow = false; + setColumns(columns); - if(columns != null) { + if (!isStarQuery()) { + String pathStr; for (SchemaPath path : columns) { assert path.getRootSegment().isNamed(); - Preconditions.checkArgument(path.getRootSegment().getPath().equals(COL_NAME), "Selected column must have name 'columns'"); - // FIXME: need re-work for text column push-down. + pathStr = path.getRootSegment().getPath(); + Preconditions.checkArgument(pathStr.equals(COL_NAME) || (pathStr.equals("*") && path.getRootSegment().getChild() == null), + "Selected column(s) must have name 'columns' or must be plain '*'"); + if (path.getRootSegment().getChild() != null) { - Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index"); + Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index"); int index = path.getRootSegment().getChild().getArraySegment().getIndex(); columnIds.add(index); - } else { - getEntireRow = true; } } Collections.sort(columnIds); + numCols = columnIds.size(); } targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE); - /* If one of the columns requested is the entire row ('columns') then ignore the rest of the columns - * we are going copy all the values in the repeated varchar vector - */ - if (!getEntireRow) { - numCols = columnIds.size(); - } TextInputFormat inputFormat = new TextInputFormat(); JobConf job = new JobConf(); job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 79c94c8..fa26b54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -18,6 +18,10 @@ package org.apache.drill.exec.vector.complex.fn; import io.netty.buffer.DrillBuf; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.UnpooledByteBufAllocator; import java.io.IOException; import java.io.Reader; @@ -31,7 +35,9 @@ import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.Float8Holder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; @@ -66,22 +72,22 @@ public class JsonReader { private boolean allTextMode; public JsonReader() throws IOException { - this(null, null, false); + this(null, false); + } + + public JsonReader(DrillBuf managedBuf, boolean allTextMode) throws IOException { + this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode); } public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode) throws JsonParseException, IOException { factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); factory.configure(Feature.ALLOW_COMMENTS, true); - this.workBuf = managedBuf; + assert Preconditions.checkNotNull(columns).size() > 0 : "json record reader requires at least a column"; this.columns = columns; - // TODO - remove this check once the optimizer is updated to push down * instead of a null list - if (this.columns == null) { - this.columns = new ArrayList(); - this.columns.add(new SchemaPath(new PathSegment.NameSegment("*"))); + this.starRequested = containsStar(); + this.workBuf = managedBuf; this.allTextMode = allTextMode; - } this.columnsFound = new boolean[this.columns.size()]; - this.starRequested = containsStar(); } private boolean containsStar() { @@ -109,7 +115,7 @@ public class JsonReader { public List<SchemaPath> getNullColumns() { ArrayList<SchemaPath> nullColumns = new ArrayList<SchemaPath>(); for (int i = 0; i < columnsFound.length; i++ ) { - if ( ! columnsFound[i] && ! columns.get(i).getRootSegment().getPath().equals("*") ) { + if ( ! columnsFound[i] && !columns.get(i).equals(AbstractRecordReader.STAR_COLUMN)) { nullColumns.add(columns.get(i)); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java index c2dcc95..0636db6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java @@ -25,6 +25,7 @@ import java.io.Reader; import java.util.List; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.fasterxml.jackson.core.JsonParseException; @@ -46,7 +47,7 @@ public class JsonReaderWithState { } public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{ - this(splitter, null, null, false); + this(splitter, null, GroupScan.ALL_COLUMNS, false); } public List<SchemaPath> getNullColumns() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index eadb1bc..31df303 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -43,6 +43,11 @@ type: "file", connection: "classpath:///", formats: { + "csv" : { + type: "text", + extensions: [ "csv" ], + delimiter: "," + }, "json" : { type: "json" }, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java index ec8e92e..8520b9b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java @@ -18,6 +18,8 @@ package org.apache.drill; +import java.util.List; + import org.junit.Ignore; import org.junit.Test; @@ -105,6 +107,32 @@ public class TestProjectPushDown extends PlanTestBase { testPhysicalPlanFromFile("queries/tpch/03.sql", expectedColNames1, expectedColNames2, expectedColNames3); } + + private static final String pushDownSql = "select %s from cp.`%s` t"; + private static final String pushDownSqlWithFilter = pushDownSql + " where %s"; + private final String[] inputTypes = new String[] { + "project/pushdown/empty.json", + "project/pushdown/empty.csv", + "tpch/lineitem.parquet" + }; + + @Test + public void testProjectPushDown() throws Exception { + final String projection = "t.trans_id, t.user_info.cust_id, t.marketing_info.keywords[0]"; + final String expected = "\"columns\" : [ \"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\" ],"; + final String filter = "t.another_field = 10 and t.columns[0] = 100 and t.columns[1] = t.other.columns[2]"; + final String expectedWithFilter = "\"columns\" : [ \"`another_field`\", \"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\", \"`columns`[0]\", \"`columns`[1]\", \"`other`.`columns`[2]\" ],"; + + for (String inputType:inputTypes) { + testPushDown(new PushDownTestInstance(pushDownSql, expected, projection, inputType)); + testPushDown(new PushDownTestInstance(pushDownSqlWithFilter, expectedWithFilter, projection, inputType, filter)); + } + } + + protected void testPushDown(PushDownTestInstance test) throws Exception { + testPhysicalPlan(test.getSql(), test.getExpected()); + } + private void testPhysicalPlanFromFile(String fileName, String... expectedSubstrs) throws Exception { String query = getFile(fileName); @@ -116,4 +144,24 @@ public class TestProjectPushDown extends PlanTestBase { } } + protected static class PushDownTestInstance { + private final String sqlPattern; + private final String expected; + private final Object[] params; + + public PushDownTestInstance(String sqlPattern, String expected, Object... params) { + this.sqlPattern = sqlPattern; + this.expected = expected; + this.params = params; + } + + public String getExpected() { + return expected; + } + + public String getSql() { + return String.format(sqlPattern, params); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index bfc0d20..10b1775 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -34,6 +34,7 @@ import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -64,7 +65,7 @@ public class TestJsonReader extends BaseTestQuery { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class); private static BufferAllocator allocator; - private static final boolean VERBOSE_DEBUG = false; + private static final boolean VERBOSE_DEBUG = true; @BeforeClass public static void setupAllocator(){ @@ -168,6 +169,7 @@ public class TestJsonReader extends BaseTestQuery { String[] queries = {Files.toString(FileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8)}; long[] rowCounts = {3}; String filename = "/store/json/schema_change_int_to_string.json"; + test("alter system set `store.json.all_text_mode` = false"); runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts); List<QueryResultBatch> results = testPhysicalWithResults(queries[0]); @@ -271,7 +273,8 @@ public class TestJsonReader extends BaseTestQuery { writer.allocate(); DrillBuf buffer = allocator.buffer(255); - JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound), buffer, null, false); + JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound), buffer, + GroupScan.ALL_COLUMNS, false); int i =0; List<Integer> batchSizes = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/resources/project/pushdown/empty.csv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.csv b/exec/java-exec/src/test/resources/project/pushdown/empty.csv new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/resources/project/pushdown/empty.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.json b/exec/java-exec/src/test/resources/project/pushdown/empty.json new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/resources/project/pushdown/empty.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.parquet b/exec/java-exec/src/test/resources/project/pushdown/empty.parquet new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index deb95fe..a3fa64e 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,7 @@ <exclude>**/*.md</exclude> <exclude>sandbox/**</exclude> <exclude>**/*.json</exclude> + <exclude>**/*.parquet</exclude> <exclude>**/*.sql</exclude> <exclude>**/git.properties</exclude> <exclude>**/*.csv</exclude>
