Repository: incubator-drill
Updated Branches:
  refs/heads/master c1c0eba5b -> 967fef5ad


DRILL-1309: Implement ProjectPastFilterPushdown and update DrillScanRel cost 
model so that exclusive column so that star query is more expensive than 
exclusive column projection. Various fixes affecting record reaaders to handle 
`*` column as well as fixes to some test cases.

exclude parquet files from rat check


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1486947
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1486947
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1486947

Branch: refs/heads/master
Commit: f148694738a84832d75aca4ef69bff47c68b463f
Parents: c1c0eba
Author: Hanifi Gunes <[email protected]>
Authored: Thu Aug 28 10:21:07 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Fri Aug 29 00:00:41 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  | 15 ++--
 .../exec/store/hbase/HBaseRecordReader.java     | 68 +++++++-------
 .../exec/store/hbase/HBaseScanBatchCreator.java | 10 ++-
 .../drill/exec/store/hive/HiveRecordReader.java | 12 +--
 .../exec/expr/fn/impl/conv/JsonConvertFrom.java | 12 ++-
 .../drill/exec/physical/base/GroupScan.java     |  3 +
 .../logical/DrillPushProjectPastFilterRule.java | 94 ++++++++++++++++++++
 .../exec/planner/logical/DrillRuleSets.java     |  3 +-
 .../exec/planner/logical/DrillScanRel.java      | 57 +++++++-----
 .../exec/planner/logical/RelOptHelper.java      |  4 +
 .../planner/sql/handlers/ExplainHandler.java    |  2 +-
 .../DrillParserWithCompoundIdConverter.java     |  9 +-
 .../drill/exec/store/AbstractRecordReader.java  | 62 +++++++++++++
 .../drill/exec/store/AbstractStoragePlugin.java |  2 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |  5 --
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  2 +-
 .../exec/store/dfs/easy/EasyGroupScan.java      | 69 ++++++--------
 .../exec/store/easy/text/TextFormatPlugin.java  |  2 +-
 .../exec/store/parquet/ParquetRowGroupScan.java | 18 ++--
 .../store/parquet/ParquetScanBatchCreator.java  |  7 +-
 .../columnreaders/ParquetRecordReader.java      | 68 ++++++++------
 .../exec/store/parquet2/DrillParquetReader.java | 15 ++--
 .../exec/store/schedule/BlockMapBuilder.java    | 28 +++---
 .../exec/store/text/DrillTextRecordReader.java  | 28 +++---
 .../exec/vector/complex/fn/JsonReader.java      | 24 +++--
 .../vector/complex/fn/JsonReaderWithState.java  |  3 +-
 .../resources/bootstrap-storage-plugins.json    |  5 ++
 .../org/apache/drill/TestProjectPushDown.java   | 48 ++++++++++
 .../vector/complex/writer/TestJsonReader.java   |  7 +-
 .../test/resources/project/pushdown/empty.csv   |  0
 .../test/resources/project/pushdown/empty.json  |  0
 .../resources/project/pushdown/empty.parquet    |  0
 pom.xml                                         |  1 +
 33 files changed, 470 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 8e9ae18..8301de1 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -43,6 +43,8 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
 import org.apache.hadoop.conf.Configuration;
@@ -107,7 +109,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.hbaseScanSpec = scanSpec;
-    this.columns = columns;
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
     init();
   }
 
@@ -162,12 +164,11 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   }
 
   private void verifyColumns() {
-    if (columns != null) {
-      for (SchemaPath column : columns) {
-        if (!(column.equals(ROW_KEY_PATH) || 
hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
-          DrillRuntimeException.format("The column family '%s' does not exist 
in HBase table: %s .",
-              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
-        }
+    if (AbstractRecordReader.isStarQuery(columns)) return;
+    for (SchemaPath column : columns) {
+      if (!(column.equals(ROW_KEY_PATH) || 
hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+        DrillRuntimeException.format("The column family '%s' does not exist in 
HBase table: %s .",
+            column.getRootSegment().getPath(), hTableDesc.getNameAsString());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 954cc5a..51a3151 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -18,13 +18,14 @@
 package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
@@ -36,7 +37,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
@@ -53,12 +54,11 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 
-public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
+public class HBaseRecordReader extends AbstractRecordReader implements 
DrillHBaseConstants {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
 
-  private LinkedHashSet<SchemaPath> columns;
   private OutputMutator outputMutator;
 
   private Map<String, MapVector> familyVectorMap;
@@ -67,7 +67,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   private HTable hTable;
   private ResultScanner resultScanner;
 
-  private String hbaseTable;
+  private String hbaseTableName;
   private Scan hbaseScan;
   private Configuration hbaseConf;
   private Result leftOver;
@@ -78,23 +78,29 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec 
subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) throws 
OutOfMemoryException {
     hbaseConf = conf;
-    hbaseTable = subScanSpec.getTableName();
+    hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader 
needs a sub-scan spec").getTableName();
     hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
-    fragmentContext=context;
-    boolean rowKeyOnly = true;
-    this.columns = Sets.newLinkedHashSet();
-    if (projectedColumns != null && projectedColumns.size() != 0) {
-      Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
-      while(columnIterator.hasNext()) {
-        SchemaPath column = columnIterator.next();
+    hbaseScan
+        .setFilter(subScanSpec.getScanFilter())
+        .setCaching(TARGET_RECORD_COUNT);
+
+    setColumns(projectedColumns);
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
columns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (!isStarQuery()) {
+      boolean rowKeyOnly = true;
+      for (SchemaPath column : columns) {
         if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) {
-          this.columns.add(ROW_KEY_PATH);
+          transformed.add(ROW_KEY_PATH);
           continue;
         }
         rowKeyOnly = false;
         NameSegment root = column.getRootSegment();
         byte[] family = root.getPath().getBytes();
-        this.columns.add(SchemaPath.getSimplePath(root.getPath()));
+        transformed.add(SchemaPath.getSimplePath(root.getPath()));
         PathSegment child = root.getChild();
         if (child != null && child.isNamed()) {
           byte[] qualifier = child.getNameSegment().getPath().getBytes();
@@ -103,23 +109,21 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
           hbaseScan.addFamily(family);
         }
       }
-    } else {
-      rowKeyOnly = false;
-      this.columns.add(ROW_KEY_PATH);
-    }
-
-    hbaseScan.setFilter(subScanSpec.getScanFilter());
-    if (rowKeyOnly) {
       /* if only the row key was requested, add a FirstKeyOnlyFilter to the 
scan
        * to fetch only one KV from each row. If a filter is already part of 
this
        * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL
        * FilterList.
        */
-      hbaseScan.setFilter(
-          HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), 
HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())
-          );
+      if (rowKeyOnly) {
+        hbaseScan.setFilter(
+            HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), 
HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()));
+      }
+    } else {
+      transformed.add(ROW_KEY_PATH);
     }
-    hbaseScan.setCaching(TARGET_RECORD_COUNT);
+
+
+    return transformed;
   }
 
   public OperatorContext getOperatorContext() {
@@ -138,7 +142,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
 
     try {
       // Add Vectors to output in the order specified when creating reader
-      for (SchemaPath column : columns) {
+      for (SchemaPath column : getColumns()) {
         if (column.equals(ROW_KEY_PATH)) {
           MaterializedField field = MaterializedField.create(column, 
ROW_KEY_TYPE);
           rowKeyVector = outputMutator.addField(field, VarBinaryVector.class);
@@ -147,9 +151,9 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
         }
       }
       logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum 
'{}', port '{}', znode '{}'.",
-          hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
+          hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
           hbaseConf.get(HBASE_ZOOKEEPER_PORT), 
hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-      hTable = new HTable(hbaseConf, hbaseTable);
+      hTable = new HTable(hbaseConf, hbaseTableName);
       resultScanner = hTable.getScanner(hbaseScan);
     } catch (SchemaChangeException | IOException e) {
       throw new ExecutionSetupException(e);
@@ -230,7 +234,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
         if (allocateOnCreate) {
           v.allocateNew();
         }
-        columns.add(column);
+        getColumns().add(column);
         familyVectorMap.put(familyName, v);
       }
       return v;
@@ -258,7 +262,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
         hTable.close();
       }
     } catch (IOException e) {
-      logger.warn("Failure while closing HBase table: " + hbaseTable, e);
+      logger.warn("Failure while closing HBase table: " + hbaseTableName, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 661e1b4..9256157 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -20,7 +20,9 @@ package org.apache.drill.exec.store.hbase;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -36,11 +38,13 @@ public class HBaseScanBatchCreator implements 
BatchCreator<HBaseSubScan>{
   public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, 
List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns = null;
     for(HBaseSubScan.HBaseSubScanSpec scanSpec : 
subScan.getRegionScanSpecList()){
       try {
-        readers.add(
-            new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), 
scanSpec, subScan.getColumns(), context)
-        );
+        if ((columns = subScan.getColumns())==null) {
+          columns = GroupScan.ALL_COLUMNS;
+        }
+        readers.add(new 
HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, columns, 
context));
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 36e55d9..1f3b3cd 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
@@ -78,13 +79,12 @@ import org.joda.time.DateTimeZone;
 
 import com.google.common.collect.Lists;
 
-public class HiveRecordReader implements RecordReader {
+public class HiveRecordReader extends AbstractRecordReader {
 
   protected Table table;
   protected Partition partition;
   protected InputSplit inputSplit;
   protected FragmentContext context;
-  protected List<SchemaPath> projectedColumns;
   protected List<String> selectedColumnNames;
   protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
   protected List<ObjectInspector> selectedColumnObjInspectors = 
Lists.newArrayList();
@@ -115,10 +115,10 @@ public class HiveRecordReader implements RecordReader {
     this.partition = partition;
     this.inputSplit = inputSplit;
     this.context = context;
-    this.projectedColumns = projectedColumns;
     this.empty = (inputSplit == null && partition == null);
     this.hiveConfigOverride = hiveConfigOverride;
     this.fragmentContext=context;
+    setColumns(projectedColumns);
     init();
   }
 
@@ -171,14 +171,14 @@ public class HiveRecordReader implements RecordReader {
       }
       sInspector = (StructObjectInspector) oi;
       StructTypeInfo sTypeInfo = (StructTypeInfo) 
TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector);
-      if (projectedColumns == null) {
+      if (isStarQuery()) {
         selectedColumnNames = sTypeInfo.getAllStructFieldNames();
         tableColumns = selectedColumnNames;
       } else {
         tableColumns = sTypeInfo.getAllStructFieldNames();
         List<Integer> columnIds = Lists.newArrayList();
         selectedColumnNames = Lists.newArrayList();
-        for (SchemaPath field : projectedColumns) {
+        for (SchemaPath field : getColumns()) {
           String columnName = field.getRootSegment().getPath();
           if (!tableColumns.contains(columnName)) {
             if (partitionNames.contains(columnName)) {
@@ -204,7 +204,7 @@ public class HiveRecordReader implements RecordReader {
         selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo));
       }
 
-      if (projectedColumns == null) {
+      if (isStarQuery()) {
         selectedPartitionNames = partitionNames;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
index 72bec3b..62d526b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
@@ -67,12 +67,14 @@ public class JsonConvertFrom {
       String input = new String(buf, com.google.common.base.Charsets.UTF_8);
 
       try {
-        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false);
+        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
 
         jsonReader.write(new java.io.StringReader(input), writer);
 
       } catch (Exception e) {
-        System.out.println(" msg = " + e.getMessage() + " trace : " + 
e.getStackTrace());
+//        System.out.println("Error while converting from JSON. ");
+//        e.printStackTrace();
+        throw new 
org.apache.drill.common.exceptions.DrillRuntimeException("Error while 
converting from JSON. ", e);
       }
     }
   }
@@ -94,12 +96,14 @@ public class JsonConvertFrom {
       String input = new String(buf, com.google.common.base.Charsets.UTF_8);
 
       try {
-        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false);
+        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new 
org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
 
         jsonReader.write(new java.io.StringReader(input), writer);
 
       } catch (Exception e) {
-        System.out.println(" msg = " + e.getMessage() + " trace : " + 
e.getStackTrace());
+//        System.out.println("Error while converting from JSON. ");
+//        e.printStackTrace();
+        throw new 
org.apache.drill.common.exceptions.DrillRuntimeException("Error while 
converting from JSON. ", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index fa20a90..9c27c0c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -32,6 +33,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
  */
 public interface GroupScan extends Scan, HasAffinity{
 
+  public static final List<SchemaPath> ALL_COLUMNS = 
Lists.<SchemaPath>newArrayList(SchemaPath.getSimplePath("*"));
+
   public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) 
throws PhysicalOperatorSetupException;
 
   public abstract SubScan getSpecificScan(int minorFragmentId) throws 
ExecutionSetupException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java
new file mode 100644
index 0000000..dcec68a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.FilterRel;
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.rules.PushProjector;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DrillPushProjectPastFilterRule extends RelOptRule {
+
+  private final static Logger logger = 
LoggerFactory.getLogger(DrillPushProjectPastFilterRule.class);
+
+  public final static RelOptRule INSTANCE = new 
DrillPushProjectPastFilterRule(new PushProjector.ExprCondition() {
+    @Override
+    public boolean test(RexNode expr) {
+      if (expr instanceof RexCall) {
+        RexCall call = (RexCall)expr;
+        return "ITEM".equals(call.getOperator().getName());
+      }
+      return false;
+    }
+  });
+
+  /**
+   * Expressions that should be preserved in the projection
+   */
+  private final PushProjector.ExprCondition preserveExprCondition;
+
+  private DrillPushProjectPastFilterRule(PushProjector.ExprCondition 
preserveExprCondition) {
+    super(RelOptHelper.any(ProjectRel.class, FilterRel.class));
+    this.preserveExprCondition = preserveExprCondition;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    ProjectRel origProj;
+    FilterRel filterRel;
+
+    if (call.rels.length == 2) {
+      origProj = call.rel(0);
+      filterRel = call.rel(1);
+    } else {
+      origProj = null;
+      filterRel = call.rel(0);
+    }
+    RelNode rel = filterRel.getChild();
+    RexNode origFilter = filterRel.getCondition();
+
+    if ((origProj != null) && RexOver.containsOver(origProj.getProjects(), 
null)) {
+      // Cannot push project through filter if project contains a windowed
+      // aggregate -- it will affect row counts. Abort this rule
+      // invocation; pushdown will be considered after the windowed
+      // aggregate has been implemented. It's OK if the filter contains a
+      // windowed aggregate.
+      return;
+    }
+
+    PushProjector pushProjector = createPushProjector(origProj, origFilter, 
rel, preserveExprCondition);
+    RelNode topProject = pushProjector.convertProject(null);
+
+    if (topProject != null) {
+      call.transformTo(topProject);
+    }
+  }
+
+  protected PushProjector createPushProjector(ProjectRel origProj, RexNode 
origFilter, RelNode rel,
+                                              PushProjector.ExprCondition 
preserveExprCondition) {
+    return new PushProjector(origProj, origFilter,rel, preserveExprCondition);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 63de69c..cf92121 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -89,7 +89,8 @@ public class DrillRuleSets {
       RemoveDistinctAggregateRule.INSTANCE, //
       ReduceAggregatesRule.INSTANCE, //
       PushProjectPastJoinRule.INSTANCE,
-      PushProjectPastFilterRule.INSTANCE,
+//      PushProjectPastFilterRule.INSTANCE,
+      DrillPushProjectPastFilterRule.INSTANCE,
 //      SwapJoinRule.INSTANCE, //
 //      PushJoinThroughJoinRule.RIGHT, //
 //      PushJoinThroughJoinRule.LEFT, //

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index dcbfb3d..d6bbcd3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -21,12 +21,16 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
@@ -46,6 +50,8 @@ import org.eigenbase.reltype.RelDataType;
  * GroupScan of a Drill table.
  */
 public class DrillScanRel extends DrillScanRelBase implements DrillRel {
+  private final static int STAR_COLUMN_COST = 10000;
+
   final private RelDataType rowType;
   private GroupScan groupScan;
 
@@ -54,7 +60,7 @@ public class DrillScanRel extends DrillScanRelBase implements 
DrillRel {
       RelOptTable table) {
     // By default, scan does not support project pushdown.
     // Decision whether push projects into scan will be made solely in 
DrillPushProjIntoScanRule.
-    this(cluster, traits, table, table.getRowType(), null);
+    this(cluster, traits, table, table.getRowType(), 
AbstractGroupScan.ALL_COLUMNS);
   }
 
   /** Creates a DrillScan. */
@@ -62,26 +68,21 @@ public class DrillScanRel extends DrillScanRelBase 
implements DrillRel {
       RelOptTable table, RelDataType rowType, List<SchemaPath> columns) {
     super(DRILL_LOGICAL, cluster, traits, table);
     this.rowType = rowType;
-
+    columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : 
columns;
     try {
-      if (columns == null || columns.isEmpty()) {
-        this.groupScan = (GroupScan) getCopy(this.drillTable.getGroupScan()) ;
-      } else {
-        this.groupScan = this.drillTable.getGroupScan().clone(columns);
-      }
+      this.groupScan = drillTable.getGroupScan().clone(columns);
     } catch (IOException e) {
       throw new DrillRuntimeException("Failure creating scan.", e);
     }
-
-  }
-
-  private static GroupScan getCopy(GroupScan scan){
-    try {
-      return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) 
(Object) Collections.emptyList());
-    } catch (ExecutionSetupException e) {
-      throw new DrillRuntimeException("Unexpected failure while coping node.", 
e);
-    }
   }
+//
+//  private static GroupScan getCopy(GroupScan scan){
+//    try {
+//      return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) 
(Object) Collections.emptyList());
+//    } catch (ExecutionSetupException e) {
+//      throw new DrillRuntimeException("Unexpected failure while coping 
node.", e);
+//    }
+//  }
 
 
   @Override
@@ -118,15 +119,29 @@ public class DrillScanRel extends DrillScanRelBase 
implements DrillRel {
   /// by both logical and physical rels.
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    ScanStats stats = this.groupScan.getScanStats();
-    int columnCount = this.getRowType().getFieldCount();
+    ScanStats stats = groupScan.getScanStats();
+    int columnCount = getRowType().getFieldCount();
+    double ioCost = 0;
+    boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new 
Predicate<String>() {
+      @Override
+      public boolean apply(String input) {
+        return Preconditions.checkNotNull(input).equals("*");
+      }
+    }).isPresent();
 
-    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return planner.getCostFactory().makeCost(stats.getRecordCount() * 
columnCount, stats.getCpuCost(), stats.getDiskCost());
+    if (isStarQuery) {
+      columnCount = STAR_COLUMN_COST;
     }
 
     // double rowCount = RelMetadataQuery.getRowCount(this);
     double rowCount = stats.getRecordCount();
+    if (rowCount < 1) {
+      rowCount = 1;
+    }
+
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return planner.getCostFactory().makeCost(rowCount * columnCount, 
stats.getCpuCost(), stats.getDiskCost());
+    }
 
     double cpuCost = rowCount * columnCount; // for now, assume cpu cost is 
proportional to row count.
     // Even though scan is reading from disk, in the currently generated plans 
all plans will
@@ -134,7 +149,7 @@ public class DrillScanRel extends DrillScanRelBase 
implements DrillRel {
     // In the future we might consider alternative scans that go against 
projections or
     // different compression schemes etc that affect the amount of data read. 
Such alternatives
     // would affect both cpu and io cost.
-    double ioCost = 0;
+
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
     return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
index 92272f8..2e253ab 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
@@ -32,6 +32,10 @@ public class RelOptHelper {
   public static RelOptRuleOperand any(Class<? extends RelNode> first){
     return RelOptRule.operand(first, RelOptRule.any());
   }
+
+  public static RelOptRuleOperand any(Class<? extends RelNode> first, Class<? 
extends RelNode> second) {
+    return RelOptRule.operand(first, RelOptRule.operand(second, 
RelOptRule.any()));
+  }
   
   public static RelOptRuleOperand some(Class<? extends RelNode> rel, 
RelOptRuleOperand first, RelOptRuleOperand... rest){
     return RelOptRule.operand(rel, RelOptRule.some(first, rest));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index 63db153..25fa0cb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -55,8 +55,8 @@ public class ExplainHandler extends DefaultSqlHandler{
     SqlNode sqlNode = rewrite(node);
     SqlNode validated = validateNode(sqlNode);
     RelNode rel = convertToRel(validated);
-    DrillRel drel = convertToDrel(rel);
     log("Optiq Logical", rel);
+    DrillRel drel = convertToDrel(rel);
     log("Drill Logical", drel);
 
     if(mode == ResultMode.LOGICAL){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
index 4886741..382456a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
@@ -21,6 +21,7 @@ import 
org.apache.drill.exec.planner.sql.parser.CompoundIdentifierConverter;
 import org.eigenbase.sql.SqlNode;
 import org.eigenbase.sql.parser.SqlAbstractParserImpl;
 import org.eigenbase.sql.parser.SqlParserImplFactory;
+import org.eigenbase.sql.util.SqlVisitor;
 
 import java.io.Reader;
 
@@ -39,15 +40,19 @@ public class DrillParserWithCompoundIdConverter extends 
DrillParserImpl {
     super(stream);
   }
 
+  protected SqlVisitor<SqlNode> createConverter() {
+    return new CompoundIdentifierConverter();
+  }
+
   @Override
   public SqlNode parseSqlExpressionEof() throws Exception {
     SqlNode originalSqlNode = super.parseSqlExpressionEof();
-    return originalSqlNode.accept(new CompoundIdentifierConverter());
+    return originalSqlNode.accept(createConverter());
   }
 
   @Override
   public SqlNode parseSqlStmtEof() throws Exception {
     SqlNode originalSqlNode = super.parseSqlStmtEof();
-    return originalSqlNode.accept(new CompoundIdentifierConverter());
+    return originalSqlNode.accept(createConverter());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
new file mode 100644
index 0000000..4cc06c8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.util.Collection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.expression.SchemaPath;
+
+public abstract class AbstractRecordReader implements RecordReader {
+  private static final String COL_NULL_ERROR = "Columns cannot be null. Use 
star column to select all fields.";
+  private static final String COL_EMPTY_ERROR = "Readers needs at least a 
column to read.";
+  public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+
+  private Collection<SchemaPath> columns = null;
+  private boolean isStarQuery = false;
+
+  protected final void setColumns(Collection<SchemaPath> projected) {
+    assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : 
COL_EMPTY_ERROR;
+    isStarQuery = isStarQuery(projected);
+    columns = transformColumns(projected);
+  }
+
+  protected Collection<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
projected) {
+    return projected;
+  }
+
+  protected boolean isStarQuery() {
+    return isStarQuery;
+  }
+
+  public static boolean isStarQuery(Collection<SchemaPath> projected) {
+    return Iterables.tryFind(Preconditions.checkNotNull(projected, 
COL_NULL_ERROR), new Predicate<SchemaPath>() {
+      @Override
+      public boolean apply(SchemaPath path) {
+        return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+      }
+    }).isPresent();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index b328245..9cdfe24 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -50,7 +50,7 @@ public abstract class AbstractStoragePlugin implements 
StoragePlugin{
 
   @Override
   public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws 
IOException {
-    return getPhysicalScan(selection, null);
+    return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 2f0e854..ec9a04e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -109,11 +109,6 @@ public class FileSystemPlugin extends 
AbstractStoragePlugin{
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws 
IOException {
-    return this.getPhysicalScan(selection, null);
-  }
-
-  @Override
   public AbstractGroupScan getPhysicalScan(JSONOptions selection, 
List<SchemaPath> columns) throws IOException {
     FormatSelection formatSelection = selection.getWith(context.getConfig(), 
FormatSelection.class);
     FormatPlugin plugin;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index bf8e301..d1923a5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -186,7 +186,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
 
   @Override
   public AbstractGroupScan getGroupScan(FileSelection selection) throws 
IOException {
-    return new EasyGroupScan(selection, this, null, selection.selectionRoot);
+    return new EasyGroupScan(selection, this, selection.selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 3de99c5..2bdf1a6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.dfs.easy;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -72,21 +71,15 @@ public class EasyGroupScan extends AbstractGroupScan{
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
+        this(new FileSelection(files, true),
+            (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, 
formatConfig),
+            columns,
+            selectionRoot);
+  }
 
-    this.formatPlugin = (EasyFormatPlugin<?>) 
engineRegistry.getFormatPlugin(storageConfig, formatConfig);
-    Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for 
provided format config.");
-    this.selection = new FileSelection(files, true);
-    try{
-      BlockMapBuilder b = new 
BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), 
formatPlugin.getContext().getBits());
-      this.chunks = 
b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), 
formatPlugin.isBlockSplittable());
-      this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
-    }catch(IOException e){
-      logger.warn("Failure determining endpoint affinity.", e);
-      this.endpointAffinities = Collections.emptyList();
-    }
-    maxWidth = chunks.size();
-    this.columns = columns;
-    this.selectionRoot = selectionRoot;
+  public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> 
formatPlugin, String selectionRoot)
+      throws IOException {
+    this(selection, formatPlugin, ALL_COLUMNS, selectionRoot);
   }
 
   public EasyGroupScan(
@@ -95,30 +88,26 @@ public class EasyGroupScan extends AbstractGroupScan{
       List<SchemaPath> columns,
       String selectionRoot
       ) throws IOException{
-    this.selection = selection;
-    this.formatPlugin = formatPlugin;
-    this.columns = columns;
-    try{
-      BlockMapBuilder b = new 
BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), 
formatPlugin.getContext().getBits());
-      this.chunks = 
b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), 
formatPlugin.isBlockSplittable());
-      this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
-    }catch(IOException e){
-      logger.warn("Failure determining endpoint affinity.", e);
-      this.endpointAffinities = Collections.emptyList();
-    }
-    maxWidth = chunks.size();
+    this.selection = Preconditions.checkNotNull(selection);
+    this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to 
load format plugin for provided format config.");
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
     this.selectionRoot = selectionRoot;
+    BlockMapBuilder b = new 
BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), 
formatPlugin.getContext().getBits());
+    this.chunks = 
b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), 
formatPlugin.isBlockSplittable());
+    this.maxWidth = chunks.size();
+    this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
   }
 
   private EasyGroupScan(EasyGroupScan that) {
-    this.chunks = that.chunks;
-    this.columns = that.columns;
-    this.endpointAffinities = that.endpointAffinities;
-    this.formatPlugin = that.formatPlugin;
-    this.mappings = that.mappings;
-    this.maxWidth = that.maxWidth;
-    this.selection = that.selection;
-    this.selectionRoot = that.selectionRoot;
+    Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+    selection = that.selection;
+    formatPlugin = that.formatPlugin;
+    columns = that.columns;
+    selectionRoot = that.selectionRoot;
+    chunks = that.chunks;
+    endpointAffinities = that.endpointAffinities;
+    maxWidth = that.maxWidth;
+    mappings = that.mappings;
   }
 
   public String getSelectionRoot() {
@@ -168,16 +157,16 @@ public class EasyGroupScan extends AbstractGroupScan{
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
     assert chunks != null && chunks.size() > 0;
-    if (this.endpointAffinities == null) {
+    if (endpointAffinities == null) {
         logger.debug("chunks: {}", chunks.size());
-        this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+        endpointAffinities = AffinityCreator.getAffinityMap(chunks);
     }
-    return this.endpointAffinities;
+    return endpointAffinities;
   }
 
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+    mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
   }
 
   @Override
@@ -232,7 +221,7 @@ public class EasyGroupScan extends AbstractGroupScan{
 
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
-    return this.formatPlugin.supportsPushDown();
+    return formatPlugin.supportsPushDown();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 81a23b0..6e1aa0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -72,7 +72,7 @@ public class TextFormatPlugin extends 
EasyFormatPlugin<TextFormatPlugin.TextForm
 
   @Override
   public AbstractGroupScan getGroupScan(FileSelection selection, 
List<SchemaPath> columns) throws IOException {
-    return new EasyGroupScan(selection, this, null, selection.selectionRoot); 
//TODO : textformat supports project?
+    return new EasyGroupScan(selection, this, columns, 
selection.selectionRoot); //TODO : textformat supports project?
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index d1a086c..fd40f41 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
@@ -60,16 +61,9 @@ public class ParquetRowGroupScan extends AbstractBase 
implements SubScan {
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot //
   ) throws ExecutionSetupException {
-
-    if(formatConfig == null) formatConfig = new ParquetFormatConfig();
-    Preconditions.checkNotNull(storageConfig);
-    Preconditions.checkNotNull(formatConfig);
-    this.formatPlugin = (ParquetFormatPlugin) 
registry.getFormatPlugin(storageConfig, formatConfig);
-    Preconditions.checkNotNull(formatPlugin);
-    this.rowGroupReadEntries = rowGroupReadEntries;
-    this.formatConfig = formatPlugin.getConfig();
-    this.columns = columns;
-    this.selectionRoot = selectionRoot;
+    this((ParquetFormatPlugin) 
registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+            formatConfig == null ? new ParquetFormatConfig() : formatConfig),
+        rowGroupReadEntries, columns, selectionRoot);
   }
 
   public ParquetRowGroupScan( //
@@ -77,10 +71,10 @@ public class ParquetRowGroupScan extends AbstractBase 
implements SubScan {
       List<RowGroupReadEntry> rowGroupReadEntries, //
       List<SchemaPath> columns,
       String selectionRoot) {
-    this.formatPlugin = formatPlugin;
+    this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
     this.formatConfig = formatPlugin.getConfig();
     this.rowGroupReadEntries = rowGroupReadEntries;
-    this.columns = columns;
+    this.columns = columns == null || columns.size() == 0 ? 
GroupScan.ALL_COLUMNS : columns;
     this.selectionRoot = selectionRoot;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index a453e66..f9b6d91 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 
 import com.google.common.base.Preconditions;
@@ -65,11 +66,9 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
 
     List<String[]> partitionColumns = Lists.newArrayList();
     List<Integer> selectedPartitionColumns = Lists.newArrayList();
-    boolean selectAllColumns = false;
+    boolean selectAllColumns = AbstractRecordReader.isStarQuery(columns);
 
-    if (columns == null || columns.size() == 0) {
-      selectAllColumns = true;
-    } else {
+    if (!selectAllColumns) {
       List<SchemaPath> newColums = Lists.newArrayList();
       Pattern pattern = Pattern.compile(String.format("%s[0-9]+", 
partitionDesignator));
       for (SchemaPath column : columns) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 34e7aea..6c2d44c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -33,8 +38,10 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -53,7 +60,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.PrimitiveType;
 
-public class ParquetRecordReader implements RecordReader {
+public class ParquetRecordReader extends AbstractRecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   // this value has been inflated to read in multiple value vectors at once, 
and then break them up into smaller vectors
@@ -75,8 +82,8 @@ public class ParquetRecordReader implements RecordReader {
   private int bitWidthAllFixedFields;
   private boolean allFieldsFixedLength;
   private int recordsPerBatch;
-  private long totalRecords;
-  private long rowGroupOffset;
+//  private long totalRecords;
+//  private long rowGroupOffset;
 
   private List<ColumnReader> columnStatuses;
   private FileSystem fileSystem;
@@ -84,8 +91,6 @@ public class ParquetRecordReader implements RecordReader {
   Path hadoopPath;
   private VarLenBinaryReader varLengthReader;
   private ParquetMetadata footer;
-  private List<SchemaPath> columns;
-  private FragmentContext fragmentContext;
   private OperatorContext operatorContext;
   // This is a parallel list to the columns list above, it is used to 
determine the subset of the project
   // pushdown columns that do not appear in this file
@@ -117,17 +122,13 @@ public class ParquetRecordReader implements RecordReader {
                              String path, int rowGroupIndex, FileSystem fs,
                              CodecFactoryExposer codecFactoryExposer, 
ParquetMetadata footer,
                              List<SchemaPath> columns) throws 
ExecutionSetupException {
-    hadoopPath = new Path(path);
-    fileSystem = fs;
+    this.hadoopPath = new Path(path);
+    this.fileSystem = fs;
     this.codecFactoryExposer = codecFactoryExposer;
     this.rowGroupIndex = rowGroupIndex;
     this.batchSize = batchSize;
     this.footer = footer;
-    this.columns = columns;
-    if (this.columns != null) {
-      columnsFound = new boolean[this.columns.size()];
-      nullFilledVectors = new ArrayList();
-    }
+    setColumns(columns);
   }
 
   public CodecFactoryExposer getCodecFactoryExposer() {
@@ -184,25 +185,29 @@ public class ParquetRecordReader implements RecordReader {
     // TODO - not sure if this is how we want to represent this
     // for now it makes the existing tests pass, simply selecting
     // all available data if no columns are provided
-    if (this.columns != null){
-      int i = 0;
-      for (SchemaPath expr : this.columns){
-        if ( field.matches(expr)){
-          columnsFound[i] = true;
-          return true;
-        }
-        i++;
+    if (isStarQuery()) {
+      return true;
+    }
+
+    int i = 0;
+    for (SchemaPath expr : getColumns()){
+      if ( field.matches(expr)){
+        columnsFound[i] = true;
+        return true;
       }
-      return false;
+      i++;
     }
-    return true;
+    return false;
   }
 
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
-
+    if (!isStarQuery()) {
+      columnsFound = new boolean[getColumns().size()];
+      nullFilledVectors = new ArrayList();
+    }
     columnStatuses = new ArrayList<>();
-    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+//    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
     List<ColumnDescriptor> columns = 
footer.getFileMetaData().getSchema().getColumns();
     allFieldsFixedLength = true;
     ColumnDescriptor column;
@@ -211,7 +216,7 @@ public class ParquetRecordReader implements RecordReader {
     mockRecordsRead = 0;
 
     MaterializedField field;
-    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+//    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
     FileMetaData fileMetaData;
 
     // TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
@@ -249,7 +254,7 @@ public class ParquetRecordReader implements RecordReader {
         allFieldsFixedLength = false;
       }
     }
-    rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+//    rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
 
     if (columnsToScan != 0  && allFieldsFixedLength) {
       recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
@@ -294,10 +299,15 @@ public class ParquetRecordReader implements RecordReader {
       }
       varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
 
-      if (this.columns != null) {
+      if (!isStarQuery()) {
+        List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns());
+        SchemaPath col;
         for (int i = 0; i < columnsFound.length; i++) {
-          if ( ! columnsFound[i]) {
-            
nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(this.columns.get(i),
 Types.optional(TypeProtos.MinorType.BIT)),
+          col = projectedColumns.get(i);
+          assert col!=null;
+          if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
+            
nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(col,
+                    Types.optional(TypeProtos.MinorType.BIT)),
                 (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT, DataMode.OPTIONAL)));
 
           }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 16f520c..7a864f0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -18,12 +18,15 @@
 package org.apache.drill.exec.store.parquet2;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.BaseValueVector;
@@ -48,11 +51,12 @@ import parquet.schema.Type;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class DrillParquetReader implements RecordReader {
+public class DrillParquetReader extends AbstractRecordReader {
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
@@ -60,7 +64,6 @@ public class DrillParquetReader implements RecordReader {
   private MessageType schema;
   private Configuration conf;
   private RowGroupReadEntry entry;
-  private List<SchemaPath> columns;
   private VectorContainerWriter writer;
   private ColumnChunkIncReadStore pageReadStore;
   private parquet.io.RecordReader<Void> recordReader;
@@ -73,11 +76,11 @@ public class DrillParquetReader implements RecordReader {
   public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, 
List<SchemaPath> columns, Configuration conf) {
     this.footer = footer;
     this.conf = conf;
-    this.columns = columns;
     this.entry = entry;
+    setColumns(columns);
   }
 
-  public static MessageType getProjection(MessageType schema, List<SchemaPath> 
columns) {
+  public static MessageType getProjection(MessageType schema, 
Collection<SchemaPath> columns) {
     MessageType projection = null;
     for (SchemaPath path : columns) {
       List<String> segments = Lists.newArrayList();
@@ -117,10 +120,10 @@ public class DrillParquetReader implements RecordReader {
       schema = footer.getFileMetaData().getSchema();
       MessageType projection = null;
 
-      if (columns == null || columns.size() == 0) {
+      if (isStarQuery()) {
         projection = schema;
       } else {
-        projection = getProjection(schema, columns);
+        projection = getProjection(schema, getColumns());
         if (projection == null) {
           projection = schema;
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index f27e8e6..3c5d9a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -64,17 +64,25 @@ public class BlockMapBuilder {
     return codecFactory.getCodec(fileStatus.getPath()) != null;
   }
 
-  public List<CompleteFileWork> generateFileWork(List<FileStatus> files, 
boolean blockify) throws IOException{
+  public List<CompleteFileWork> generateFileWork(List<FileStatus> files, 
boolean blockify) throws IOException {
     List<CompleteFileWork> work = Lists.newArrayList();
-    for(FileStatus f : files){
-      ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f);
-      if(!blockify || compressed(f)){
-        work.add(new CompleteFileWork(this.getEndpointByteMap(new 
FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
-        continue;
+    boolean error = false;
+    for(FileStatus f : files) {
+      error = false;
+      if (blockify && !compressed(f)) {
+        try {
+          ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(f);
+          for (Entry<Range<Long>, BlockLocation> l : 
rangeMap.asMapOfRanges().entrySet()) {
+            work.add(new CompleteFileWork(getEndpointByteMap(new 
FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), 
f.getPath().toString()));
+          }
+        } catch (IOException e) {
+          logger.warn("failure while generating file work.", e);
+          error = true;
+        }
       }
-      
-      for(Entry<Range<Long>, BlockLocation> l : 
rangeMap.asMapOfRanges().entrySet()){
-        work.add(new CompleteFileWork(this.getEndpointByteMap(new 
FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), 
f.getPath().toString()));
+
+      if (!blockify || error || compressed(f)) {
+        work.add(new CompleteFileWork(getEndpointByteMap(new 
FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
       }
     }
     return work;
@@ -135,7 +143,7 @@ public class BlockMapBuilder {
   
   private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws 
IOException{
     ImmutableRangeMap<Long,BlockLocation> blockMap  = blockMapMap.get(path);
-    if(blockMap == null){
+    if(blockMap == null) {
       blockMap = buildBlockMap(path);
     }
     return blockMap;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 68921a2..2031aee 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
@@ -34,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.RepeatedVarCharVector;
@@ -48,7 +51,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class DrillTextRecordReader implements RecordReader {
+public class DrillTextRecordReader extends AbstractRecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
 
   static final String COL_NAME = "columns";
@@ -66,36 +69,31 @@ public class DrillTextRecordReader implements RecordReader {
   private Text value;
   private int numCols = 0;
   private boolean redoRecord = false;
-  private boolean first = true;
 
   public DrillTextRecordReader(FileSplit split, FragmentContext context, char 
delimiter, List<SchemaPath> columns) {
     this.fragmentContext = context;
     this.delimiter = (byte) delimiter;
-    boolean getEntireRow = false;
+    setColumns(columns);
 
-    if(columns != null) {
+    if (!isStarQuery()) {
+      String pathStr;
       for (SchemaPath path : columns) {
         assert path.getRootSegment().isNamed();
-        
Preconditions.checkArgument(path.getRootSegment().getPath().equals(COL_NAME), 
"Selected column must have name 'columns'");
-        // FIXME: need re-work for text column push-down.
+        pathStr = path.getRootSegment().getPath();
+        Preconditions.checkArgument(pathStr.equals(COL_NAME) || 
(pathStr.equals("*") && path.getRootSegment().getChild() == null),
+            "Selected column(s) must have name 'columns' or must be plain 
'*'");
+
         if (path.getRootSegment().getChild() != null) {
-          
Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected
 column must be an array index");
+          
Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), 
"Selected column must be an array index");
           int index = 
path.getRootSegment().getChild().getArraySegment().getIndex();
           columnIds.add(index);
-        } else {
-          getEntireRow = true;
         }
       }
       Collections.sort(columnIds);
+      numCols = columnIds.size();
     }
     targetRecordCount = 
context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
 
-    /* If one of the columns requested is the entire row ('columns') then 
ignore the rest of the columns
-     * we are going copy all the values in the repeated varchar vector
-     */
-    if (!getEntireRow) {
-      numCols = columnIds.size();
-    }
     TextInputFormat inputFormat = new TextInputFormat();
     JobConf job = new JobConf();
     job.setInt("io.file.buffer.size", 
context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 79c94c8..fa26b54 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -18,6 +18,10 @@
 package org.apache.drill.exec.vector.complex.fn;
 
 import io.netty.buffer.DrillBuf;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.IOException;
 import java.io.Reader;
@@ -31,7 +35,9 @@ import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -66,22 +72,22 @@ public class JsonReader {
   private boolean allTextMode;
 
   public JsonReader() throws IOException {
-    this(null, null, false);
+    this(null, false);
+  }
+
+  public JsonReader(DrillBuf managedBuf, boolean allTextMode) throws 
IOException {
+    this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode);
   }
 
   public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean 
allTextMode) throws JsonParseException, IOException {
     factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
     factory.configure(Feature.ALLOW_COMMENTS, true);
-    this.workBuf = managedBuf;
+    assert Preconditions.checkNotNull(columns).size() > 0 : "json record 
reader requires at least a column";
     this.columns = columns;
-    // TODO - remove this check once the optimizer is updated to push down * 
instead of a null list
-    if (this.columns == null) {
-      this.columns = new ArrayList();
-      this.columns.add(new SchemaPath(new PathSegment.NameSegment("*")));
+    this.starRequested = containsStar();
+    this.workBuf = managedBuf;
     this.allTextMode = allTextMode;
-    }
     this.columnsFound = new boolean[this.columns.size()];
-    this.starRequested = containsStar();
   }
 
   private boolean containsStar() {
@@ -109,7 +115,7 @@ public class JsonReader {
   public List<SchemaPath> getNullColumns() {
     ArrayList<SchemaPath> nullColumns = new ArrayList<SchemaPath>();
     for (int i = 0; i < columnsFound.length; i++ ) {
-      if ( ! columnsFound[i] && ! 
columns.get(i).getRootSegment().getPath().equals("*") ) {
+      if ( ! columnsFound[i] && 
!columns.get(i).equals(AbstractRecordReader.STAR_COLUMN)) {
         nullColumns.add(columns.get(i));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
index c2dcc95..0636db6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
@@ -25,6 +25,7 @@ import java.io.Reader;
 import java.util.List;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.fasterxml.jackson.core.JsonParseException;
@@ -46,7 +47,7 @@ public class JsonReaderWithState {
   }
 
   public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{
-    this(splitter, null, null, false);
+    this(splitter, null, GroupScan.ALL_COLUMNS, false);
   }
 
   public List<SchemaPath> getNullColumns() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json 
b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index eadb1bc..31df303 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -43,6 +43,11 @@
       type: "file",
       connection: "classpath:///",
       formats: {
+        "csv" : {
+          type: "text",
+          extensions: [ "csv" ],
+          delimiter: ","
+        },
         "json" : {
           type: "json"
         },

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
index ec8e92e..8520b9b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
@@ -18,6 +18,8 @@
 
 package org.apache.drill;
 
+import java.util.List;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -105,6 +107,32 @@ public class TestProjectPushDown extends PlanTestBase {
     testPhysicalPlanFromFile("queries/tpch/03.sql", expectedColNames1, 
expectedColNames2, expectedColNames3);
   }
 
+
+  private static final String pushDownSql = "select %s from cp.`%s` t";
+  private static final String pushDownSqlWithFilter = pushDownSql + " where 
%s";
+  private final String[] inputTypes = new String[] {
+      "project/pushdown/empty.json",
+      "project/pushdown/empty.csv",
+      "tpch/lineitem.parquet"
+  };
+
+  @Test
+  public void testProjectPushDown() throws Exception {
+    final String projection = "t.trans_id, t.user_info.cust_id, 
t.marketing_info.keywords[0]";
+    final String expected = "\"columns\" : [ \"`trans_id`\", 
\"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\" ],";
+    final String filter = "t.another_field = 10 and t.columns[0] = 100 and 
t.columns[1] = t.other.columns[2]";
+    final String expectedWithFilter = "\"columns\" : [ \"`another_field`\", 
\"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\", 
\"`columns`[0]\", \"`columns`[1]\", \"`other`.`columns`[2]\" ],";
+
+    for (String inputType:inputTypes) {
+      testPushDown(new PushDownTestInstance(pushDownSql, expected, projection, 
inputType));
+      testPushDown(new PushDownTestInstance(pushDownSqlWithFilter, 
expectedWithFilter, projection, inputType, filter));
+    }
+  }
+
+  protected void testPushDown(PushDownTestInstance test) throws Exception {
+    testPhysicalPlan(test.getSql(), test.getExpected());
+  }
+
   private void testPhysicalPlanFromFile(String fileName, String... 
expectedSubstrs)
       throws Exception {
     String query = getFile(fileName);
@@ -116,4 +144,24 @@ public class TestProjectPushDown extends PlanTestBase {
     }
   }
 
+  protected static class PushDownTestInstance {
+    private final String sqlPattern;
+    private final String expected;
+    private final Object[] params;
+
+    public PushDownTestInstance(String sqlPattern, String expected, Object... 
params) {
+      this.sqlPattern = sqlPattern;
+      this.expected = expected;
+      this.params = params;
+    }
+
+    public String getExpected() {
+      return expected;
+    }
+
+    public String getSql() {
+      return String.format(sqlPattern, params);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index bfc0d20..10b1775 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -34,6 +34,7 @@ import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -64,7 +65,7 @@ public class TestJsonReader extends BaseTestQuery {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
 
   private static BufferAllocator allocator;
-  private static final boolean VERBOSE_DEBUG = false;
+  private static final boolean VERBOSE_DEBUG = true;
 
   @BeforeClass
   public static void setupAllocator(){
@@ -168,6 +169,7 @@ public class TestJsonReader extends BaseTestQuery {
     String[] queries = 
{Files.toString(FileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"),
 Charsets.UTF_8)};
     long[] rowCounts = {3};
     String filename = "/store/json/schema_change_int_to_string.json";
+    test("alter system set `store.json.all_text_mode` = false");
     runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, 
rowCounts);
 
     List<QueryResultBatch> results = testPhysicalWithResults(queries[0]);
@@ -271,7 +273,8 @@ public class TestJsonReader extends BaseTestQuery {
     writer.allocate();
 
     DrillBuf buffer = allocator.buffer(255);
-    JsonReaderWithState jsonReader = new JsonReaderWithState(new 
ReaderJSONRecordSplitter(compound), buffer, null, false);
+    JsonReaderWithState jsonReader = new JsonReaderWithState(new 
ReaderJSONRecordSplitter(compound), buffer,
+        GroupScan.ALL_COLUMNS, false);
     int i =0;
     List<Integer> batchSizes = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/resources/project/pushdown/empty.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.csv 
b/exec/java-exec/src/test/resources/project/pushdown/empty.csv
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/resources/project/pushdown/empty.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.json 
b/exec/java-exec/src/test/resources/project/pushdown/empty.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/exec/java-exec/src/test/resources/project/pushdown/empty.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.parquet 
b/exec/java-exec/src/test/resources/project/pushdown/empty.parquet
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1486947/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index deb95fe..a3fa64e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,7 @@
             <exclude>**/*.md</exclude>
             <exclude>sandbox/**</exclude>
             <exclude>**/*.json</exclude>
+            <exclude>**/*.parquet</exclude>
             <exclude>**/*.sql</exclude>
             <exclude>**/git.properties</exclude>
             <exclude>**/*.csv</exclude>

Reply via email to