DRILL-1543: Implement partition pruning for Hive tables

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b956e45e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b956e45e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b956e45e

Branch: refs/heads/master
Commit: b956e45e65cbbe347dbfdbeda5302ba7aeff6ed7
Parents: eff3764
Author: Mehant Baid <[email protected]>
Authored: Fri Oct 17 19:22:01 2014 -0700
Committer: Mehant Baid <[email protected]>
Committed: Thu Oct 23 14:13:35 2014 -0700

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    |  56 ++++++++
 .../HivePushPartitionFilterIntoScan.java        | 142 +++++++++++++++++++
 .../drill/exec/store/hive/HiveReadEntry.java    |  13 ++
 .../apache/drill/exec/store/hive/HiveScan.java  |  13 +-
 .../exec/store/hive/HiveStoragePlugin.java      |   8 +-
 .../drill/exec/TestHivePartitionPruning.java    |  69 +++++++++
 .../exec/store/hive/HiveTestDataGenerator.java  |  13 ++
 .../planner/FileSystemPartitionDescriptor.java  |  48 +++++++
 .../drill/exec/planner/PartitionDescriptor.java |  36 +++++
 .../exec/planner/logical/DirPathBuilder.java    |  29 ++--
 .../DrillPushPartitionFilterIntoScan.java       |  67 ++-------
 .../planner/logical/PartitionPruningUtil.java   |  78 ++++++++++
 .../java/org/apache/drill/PlanTestBase.java     |   2 +-
 13 files changed, 502 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
new file mode 100644
index 0000000..e6ca21e
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import org.apache.drill.exec.planner.PartitionDescriptor;
+import org.apache.drill.exec.store.hive.HiveTable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// Partition descriptor for hive tables
+public class HivePartitionDescriptor implements PartitionDescriptor {
+
+  private final Map<String, Integer> partitionMap = new HashMap<>();
+  private final int MAX_NESTED_SUBDIRS;
+
+  public HivePartitionDescriptor(List<HiveTable.FieldSchemaWrapper> 
partitionName) {
+    int i = 0;
+    for (HiveTable.FieldSchemaWrapper wrapper : partitionName) {
+      partitionMap.put(wrapper.name, i);
+      i++;
+    }
+    MAX_NESTED_SUBDIRS = i;
+  }
+
+  @Override
+  public int getPartitionHierarchyIndex(String partitionName) {
+    return partitionMap.get(partitionName);
+  }
+
+  @Override
+  public boolean isPartitionName(String name) {
+    return (partitionMap.get(name) != null);
+  }
+
+  @Override
+  public int getMaxHierarchyLevel() {
+    return MAX_NESTED_SUBDIRS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
new file mode 100644
index 0000000..374c486
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.sql.logical;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DirPathBuilder;
+import org.apache.drill.exec.planner.logical.DrillFilterRel;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.PartitionPruningUtil;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.sql.HivePartitionDescriptor;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.HiveReadEntry;
+import org.apache.drill.exec.store.hive.HiveScan;
+import org.apache.drill.exec.store.hive.HiveTable;
+import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelOptRuleOperand;
+
+import com.google.common.collect.Lists;
+
+public abstract class HivePushPartitionFilterIntoScan extends 
StoragePluginOptimizerRule {
+
+  public static final StoragePluginOptimizerRule HIVE_FILTER_ON_PROJECT =
+      new HivePushPartitionFilterIntoScan(
+          RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+          "HivePushPartitionFilterIntoScan:Filter_On_Project") {
+
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+          final DrillScanRel scan = (DrillScanRel) call.rel(2);
+          GroupScan groupScan = scan.getGroupScan();
+          return groupScan instanceof HiveScan &&  
groupScan.supportsPartitionFilterPushdown();
+        }
+
+        @Override
+        public void onMatch(RelOptRuleCall call) {
+          final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+          final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
+          final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+          doOnMatch(call, filterRel, projectRel, scanRel);
+        }
+      };
+
+  public static final StoragePluginOptimizerRule HIVE_FILTER_ON_SCAN =
+      new HivePushPartitionFilterIntoScan(
+          RelOptHelper.some(DrillFilterRel.class, 
RelOptHelper.any(DrillScanRel.class)),
+          "HivePushPartitionFilterIntoScan:Filter_On_Scan") {
+
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+          final DrillScanRel scan = (DrillScanRel) call.rel(1);
+          GroupScan groupScan = scan.getGroupScan();
+          return groupScan instanceof HiveScan &&  
groupScan.supportsPartitionFilterPushdown();
+        }
+
+        @Override
+        public void onMatch(RelOptRuleCall call) {
+          final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+          final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+          doOnMatch(call, filterRel, null, scanRel);
+        }
+      };
+
+  private HivePushPartitionFilterIntoScan(
+      RelOptRuleOperand operand,
+      String id) {
+    super(operand, id);
+  }
+
+  private HiveReadEntry splitFilter(HiveReadEntry origReadEntry, 
DirPathBuilder builder) {
+    HiveTable table = origReadEntry.table;
+    List<HivePartition> partitions = origReadEntry.partitions;
+    List<HivePartition> newPartitions = new LinkedList<>();
+    String pathPrefix = 
PartitionPruningUtil.truncatePrefixFromPath(table.getTable().getSd().getLocation());
+    List<String> newFiles = Lists.newArrayList();
+    List<String> dirPathList = builder.getDirPath();
+
+    for (String dirPath : dirPathList) {
+      String fullPath = pathPrefix + dirPath;
+      // check containment of this path in the list of files
+      for (HivePartition part: partitions) {
+        String origFilePath = origReadEntry.getPartitionLocation(part);
+        String origFileName = 
PartitionPruningUtil.truncatePrefixFromPath(origFilePath);
+
+        if (origFileName.startsWith(fullPath)) {
+          newFiles.add(origFileName);
+          newPartitions.add(part);
+        }
+      }
+    }
+
+    if (newFiles.size() > 0) {
+      HiveReadEntry newReadEntry = new HiveReadEntry(table, newPartitions, 
origReadEntry.hiveConfigOverride);
+      return newReadEntry;
+    }
+    return origReadEntry;
+  }
+
+  protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, 
DrillProjectRel projectRel, DrillScanRel scanRel) {
+    DrillRel inputRel = projectRel != null ? projectRel : scanRel;
+    HiveReadEntry origReadEntry = 
((HiveScan)scanRel.getGroupScan()).hiveReadEntry;
+    DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, 
filterRel.getCluster().getRexBuilder(), new 
HivePartitionDescriptor(origReadEntry.table.partitionKeys));
+    HiveReadEntry newReadEntry = splitFilter(origReadEntry, builder);
+
+    if (origReadEntry == newReadEntry) {
+      return; // no directory filter was pushed down
+    }
+
+    try {
+      HiveScan oldScan = (HiveScan) scanRel.getGroupScan();
+      HiveScan hiveScan = new HiveScan(newReadEntry, oldScan.storagePlugin, 
oldScan.columns);
+      PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, 
hiveScan, builder);
+    } catch (ExecutionSetupException e) {
+      throw new DrillRuntimeException(e);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 13845ae..70f8a5b 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.store.hive;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import net.hydromatic.optiq.Schema.TableType;
 
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 
@@ -74,5 +77,15 @@ public class HiveReadEntry {
 
     return TableType.TABLE;
   }
+
+  public String getPartitionLocation(HiveTable.HivePartition partition) {
+    String partitionPath = table.getTable().getSd().getLocation();
+
+    for (String value: partition.values) {
+      partitionPath += "/" + value;
+    }
+
+    return partitionPath;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 303fc3c..d1cc09b 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -43,6 +43,7 @@ import 
org.apache.drill.exec.store.hive.HiveTable.HivePartition;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -109,6 +110,7 @@ public class HiveScan extends AbstractGroupScan {
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.partitions = hiveReadEntry.getPartitions();
+    this.storagePlugin = storagePlugin;
     getSplits();
     endpoints = storagePlugin.getContext().getBits();
     this.storagePluginName = storagePlugin.getName();
@@ -305,7 +307,8 @@ public class HiveScan extends AbstractGroupScan {
   public String toString() {
     return "HiveScan [table=" + table
         + ", inputSplits=" + inputSplits
-        + ", columns=" + columns + "]";
+        + ", columns=" + columns
+        + ", partitions= " + partitions +"]";
   }
 
   @Override
@@ -320,4 +323,12 @@ public class HiveScan extends AbstractGroupScan {
     return true;
   }
 
+  // Return true if the current table is partitioned false otherwise
+  public boolean supportsPartitionFilterPushdown() {
+    List<FieldSchema> partitionKeys = table.getPartitionKeys();
+    if (partitionKeys == null || partitionKeys.size() == 0) {
+      return false;
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index e221707..a3d4583 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
+import com.google.common.collect.ImmutableSet;
 
 import net.hydromatic.optiq.Schema.TableType;
 import net.hydromatic.optiq.SchemaPlus;
@@ -26,9 +28,11 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
+import 
org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -80,6 +84,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   public void registerSchemas(UserSession session, SchemaPlus parent) {
     schemaFactory.registerSchemas(session, parent);
   }
-
+  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+    return 
ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, 
HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
new file mode 100644
index 0000000..8d42fca
--- /dev/null
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestHivePartitionPruning extends PlanTestBase {
+
+  @BeforeClass
+  public static void generateHive() throws Exception{
+    new 
HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage());
+  }
+
+  //Currently we do not have a good way to test plans so using a crude string 
comparison
+  @Test
+  public void testSimplePartitionFilter() throws Exception {
+    String query = "explain plan for select * from 
hive.`default`.partition_pruning_test where c = 1";
+    String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+    // Check and make sure that Filter is not present in the plan
+    assert plan.contains("Filter") == false;
+  }
+
+  @Test
+  public void testDisjunctsPartitionFilter() throws Exception {
+    String query = "explain plan for select * from 
hive.`default`.partition_pruning_test where (c = 1) or (d = 1)";
+    String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+    // Check and make sure that Filter is not present in the plan
+    assert plan.contains("Filter") == false;
+  }
+
+  @Test
+  public void testConjunctsPartitionFilter() throws Exception {
+    String query = "explain plan for select * from 
hive.`default`.partition_pruning_test where c = 1 and d = 1";
+    String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+    // Check and make sure that Filter is not present in the plan
+    assert plan.contains("Filter") == false;
+  }
+
+  @Ignore("DRILL-1571")
+  public void testComplexFilter() throws Exception {
+    String query = "explain plan for select * from 
hive.`default`.partition_pruning_test where (c = 1 and d = 1) or (c = 2 and d = 
3)";
+    String plan = getPlanInString(query, OPTIQ_FORMAT);
+
+    // Check and make sure that Filter is not present in the plan
+    assert plan.contains("Filter") == false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 7b2c9b6..d339d28 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -206,6 +206,19 @@ public class HiveTestDataGenerator {
     // create a Hive view to test how its metadata is populated in Drill's 
INFORMATION_SCHEMA
     executeQuery("CREATE VIEW IF NOT EXISTS hiveview AS SELECT * FROM kv");
 
+    // create partitioned hive table to test partition pruning
+    executeQuery("USE default");
+    executeQuery("CREATE TABLE IF NOT EXISTS default.partition_pruning_test(a 
DATE, b TIMESTAMP) "+
+        "partitioned by (c int, d int, e int) ROW FORMAT DELIMITED FIELDS 
TERMINATED BY ',' STORED AS TEXTFILE");
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=1, d=1, e=1)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=1, d=1, e=2)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=1, d=2, e=1)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=1, d=1, e=2)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=2, d=1, e=1)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=2, d=1, e=2)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=2, d=3, e=1)", testDateDataFile));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE 
default.partition_pruning_test partition(c=2, d=3, e=2)", testDateDataFile));
+
     ss.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
new file mode 100644
index 0000000..4c1f8e8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner;
+
+// partition descriptor for file system based tables
+public class FileSystemPartitionDescriptor implements PartitionDescriptor {
+
+  static final int MAX_NESTED_SUBDIRS = 10;          // allow up to 10 nested 
sub-directories
+
+  private final String partitionLabel;
+  private final int partitionLabelLength;
+
+  public FileSystemPartitionDescriptor(String partitionLabel) {
+    this.partitionLabel = partitionLabel;
+    this.partitionLabelLength = partitionLabel.length();
+  }
+
+  @Override
+  public int getPartitionHierarchyIndex(String partitionName) {
+    String suffix = partitionName.substring(partitionLabelLength); // get the 
numeric suffix from 'dir<N>'
+    return Integer.parseInt(suffix);
+  }
+
+  @Override
+  public boolean isPartitionName(String name) {
+    return name.matches(partitionLabel +"[0-9]");
+  }
+
+  @Override
+  public int getMaxHierarchyLevel() {
+    return MAX_NESTED_SUBDIRS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
new file mode 100644
index 0000000..02a6a8f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner;
+
+// Interface used to describe partitions. Currently used by file system based 
partitions and hive partitions
+public interface PartitionDescriptor {
+
+  /* Get the hierarchy index of the given partition
+   * For eg: if we have the partition laid out as follows
+   * 1997/q1/jan
+   *
+   * then getPartitionHierarchyIndex("jan") => 2
+   */
+  public int getPartitionHierarchyIndex(String partitionName);
+
+  // Given a column name return boolean to indicate if its a partition column 
or not
+  public boolean isPartitionName(String name);
+
+  // Maximum level of partition nesting/ hierarchy supported
+  public int getMaxHierarchyLevel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
index eeb664d..7972d74 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.eigenbase.relopt.RelOptUtil;
 import org.eigenbase.reltype.RelDataTypeField;
 import org.eigenbase.rex.RexBuilder;
@@ -44,32 +45,34 @@ import com.google.common.collect.Lists;
 public class DirPathBuilder extends RexVisitorImpl <SchemaPath> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DirPathBuilder.class);
 
-  static final int MAX_NESTED_SUBDIRS = 10;          // allow up to 10 nested 
sub-directories
   static final String EMPTY_STRING = "";
 
   final private DrillFilterRel filterRel;
   final private DrillRel inputRel;
   final private RexBuilder builder;
-  final private String dirLabel;
+  final private PartitionDescriptor partitionDescriptor;
 
-  private List<String> dirNameList = 
Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS);
-  private List<RexNode> conjunctList = 
Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS);
+  private List<String> dirNameList;
+  private List<RexNode> conjunctList;
   private List<String> dirPathList = Lists.newArrayList();
   private RexNode currentConjunct = null;    // the current conjunct are we 
evaluating during visitor traversal
   private RexNode finalCondition = null;     // placeholder for the final 
filter condition
   private boolean dirMatch = false;
 
-  DirPathBuilder(DrillFilterRel filterRel, DrillRel inputRel, RexBuilder 
builder, String dirLabel) {
+  public DirPathBuilder(DrillFilterRel filterRel, DrillRel inputRel, 
RexBuilder builder, PartitionDescriptor partitionDescriptor) {
     super(true);
     this.filterRel = filterRel;
     this.inputRel = inputRel;
     this.builder = builder;
-    this.dirLabel = dirLabel;
     this.finalCondition = filterRel.getCondition();
+    this.partitionDescriptor = partitionDescriptor;
   }
 
   private void initPathComponents() {
-    for (int i=0; i < MAX_NESTED_SUBDIRS; i++) {
+    int maxHierarchy = partitionDescriptor.getMaxHierarchyLevel();
+    dirNameList = Lists.newArrayListWithExpectedSize(maxHierarchy);
+    conjunctList = Lists.newArrayListWithExpectedSize(maxHierarchy);
+    for (int i=0; i < maxHierarchy; i++) {
       dirNameList.add(EMPTY_STRING);
       conjunctList.add(null);
     }
@@ -153,7 +156,7 @@ public class DirPathBuilder extends RexVisitorImpl 
<SchemaPath> {
   public SchemaPath visitInputRef(RexInputRef inputRef) {
     final int index = inputRef.getIndex();
     final RelDataTypeField field = 
inputRel.getRowType().getFieldList().get(index);
-    if (field.getName().matches(dirLabel+"[0-9]")) {
+    if (partitionDescriptor.isPartitionName(field.getName())) {
       dirMatch = true;
     }
     return FieldReference.getWithQuotedRef(field.getName());
@@ -175,19 +178,18 @@ public class DirPathBuilder extends RexVisitorImpl 
<SchemaPath> {
         if (dirMatch && e1 != null) {
           // get the index for the 'dir<N>' filter
           String dirName = e1.getRootSegment().getPath();
-          String suffix = dirName.substring(dirLabel.length()); // get the 
numeric suffix from 'dir<N>'
-          int suffixIndex = Integer.parseInt(suffix);
+          int hierarychyIndex = 
partitionDescriptor.getPartitionHierarchyIndex(dirName);
 
-          if (suffixIndex >= MAX_NESTED_SUBDIRS) {
+          if (hierarychyIndex >= partitionDescriptor.getMaxHierarchyLevel()) {
             return null;
           }
 
           // SchemaPath e2 = call.getOperands().get(1).accept(this);
           if (call.getOperands().get(1).getKind() == SqlKind.LITERAL) {
             String e2 = 
((RexLiteral)call.getOperands().get(1)).getValue2().toString();
-            dirNameList.set(suffixIndex, e2);
+            dirNameList.set(hierarychyIndex, e2);
             // dirNameList.set(suffixIndex, e2.getRootSegment().getPath());
-            conjunctList.set(suffixIndex, currentConjunct);
+            conjunctList.set(hierarychyIndex, currentConjunct);
             return e1;
           }
         }
@@ -216,7 +218,6 @@ public class DirPathBuilder extends RexVisitorImpl 
<SchemaPath> {
     if (dirMatch) {
       return arg;
     }
-
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
index 19a92f0..c6dceda 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java
@@ -23,6 +23,8 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.physical.base.FileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -44,7 +46,9 @@ public abstract class DrillPushPartitionFilterIntoScan 
extends RelOptRule {
     @Override
       public boolean matches(RelOptRuleCall call) {
         final DrillScanRel scan = (DrillScanRel) call.rel(2);
-        return scan.getGroupScan().supportsPartitionFilterPushdown();
+        GroupScan groupScan = scan.getGroupScan();
+        // this rule is applicable only for dfs based partition pruning
+        return groupScan instanceof FileGroupScan && 
groupScan.supportsPartitionFilterPushdown();
       }
 
     @Override
@@ -64,7 +68,9 @@ public abstract class DrillPushPartitionFilterIntoScan 
extends RelOptRule {
       @Override
         public boolean matches(RelOptRuleCall call) {
           final DrillScanRel scan = (DrillScanRel) call.rel(1);
-          return scan.getGroupScan().supportsPartitionFilterPushdown();
+          GroupScan groupScan = scan.getGroupScan();
+          // this rule is applicable only for dfs based partition pruning
+          return groupScan instanceof FileGroupScan && 
groupScan.supportsPartitionFilterPushdown();
         }
 
       @Override
@@ -94,13 +100,8 @@ public abstract class DrillPushPartitionFilterIntoScan 
extends RelOptRule {
       String fullPath = pathPrefix + dirPath;
       // check containment of this path in the list of files
       for (String origFilePath : origFiles) {
-        String[] components = origFilePath.split(":", 2); // some paths are of 
the form 'file:<path>', so we need to split
-        String origFileName = "";
-        if (components.length == 1) {
-          origFileName = components[0];
-        } else {
-          origFileName = components[1];
-        }
+        String origFileName = 
PartitionPruningUtil.truncatePrefixFromPath(origFilePath);
+
         if (origFileName.startsWith(fullPath)) {
           newFiles.add(origFileName);
         }
@@ -120,7 +121,7 @@ public abstract class DrillPushPartitionFilterIntoScan 
extends RelOptRule {
     DrillRel inputRel = projectRel != null ? projectRel : scanRel;
 
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, 
filterRel.getCluster().getRexBuilder(), settings.getFsPartitionColumnLabel());
+    DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, 
filterRel.getCluster().getRexBuilder(), new 
FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel()));
 
     FormatSelection origSelection = 
(FormatSelection)scanRel.getDrillTable().getSelection();
     FormatSelection newSelection = splitFilter(origSelection, builder);
@@ -129,55 +130,11 @@ public abstract class DrillPushPartitionFilterIntoScan 
extends RelOptRule {
       return; // no directory filter was pushed down
     }
 
-    RexNode origFilterCondition = filterRel.getCondition();
-    RexNode newFilterCondition = builder.getFinalCondition();
-
     try {
       FileGroupScan fgscan = 
((FileGroupScan)scanRel.getGroupScan()).clone(newSelection.getSelection());
-
-      if (newFilterCondition.isAlwaysTrue()) {
-
-        final DrillScanRel newScanRel =
-            new DrillScanRel(scanRel.getCluster(),
-                scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-                scanRel.getTable(),
-                fgscan,
-                scanRel.getRowType(),
-                scanRel.getColumns());
-
-        if (projectRel != null) {
-          DrillProjectRel newProjectRel = new 
DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(),
-              newScanRel, projectRel.getProjects(), filterRel.getRowType());
-
-          call.transformTo(newProjectRel);
-        } else {
-          call.transformTo(newScanRel);
-        }
-      } else {
-
-      final DrillScanRel newScanRel =
-          new DrillScanRel(scanRel.getCluster(),
-              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scanRel.getTable(),
-              fgscan,
-              scanRel.getRowType(),
-              scanRel.getColumns());
-      if (projectRel != null) {
-        DrillProjectRel newProjectRel = new 
DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(),
-            newScanRel, projectRel.getProjects(), projectRel.getRowType());
-        inputRel = newProjectRel;
-      } else {
-        inputRel = newScanRel;
-      }
-      final DrillFilterRel newFilterRel = new 
DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(),
-          inputRel, origFilterCondition /* for now keep the original condition 
until we add more test coverage */);
-
-      call.transformTo(newFilterRel);
-      }
+      PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, 
fgscan, builder);
     } catch (IOException e) {
       throw new DrillRuntimeException(e) ;
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
new file mode 100644
index 0000000..88b4a51
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexNode;
+
+public class PartitionPruningUtil {
+  public static void rewritePlan(RelOptRuleCall call, DrillFilterRel 
filterRel, DrillProjectRel projectRel, DrillScanRel scanRel, GroupScan newScan, 
DirPathBuilder builder) {
+    RexNode origFilterCondition = filterRel.getCondition();
+    RexNode newFilterCondition = builder.getFinalCondition();
+
+    if (newFilterCondition.isAlwaysTrue()) {
+
+      final DrillScanRel newScanRel =
+          new DrillScanRel(scanRel.getCluster(),
+              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+              scanRel.getTable(),
+              newScan,
+              scanRel.getRowType(),
+              scanRel.getColumns());
+
+      if (projectRel != null) {
+        DrillProjectRel newProjectRel = new 
DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(),
+            newScanRel, projectRel.getProjects(), filterRel.getRowType());
+
+        call.transformTo(newProjectRel);
+      } else {
+        call.transformTo(newScanRel);
+      }
+    } else {
+      DrillRel inputRel;
+      final DrillScanRel newScanRel =
+          new DrillScanRel(scanRel.getCluster(),
+              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+              scanRel.getTable(),
+              newScan,
+              scanRel.getRowType(),
+              scanRel.getColumns());
+      if (projectRel != null) {
+        DrillProjectRel newProjectRel = new 
DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(),
+            newScanRel, projectRel.getProjects(), projectRel.getRowType());
+        inputRel = newProjectRel;
+      } else {
+        inputRel = newScanRel;
+      }
+      final DrillFilterRel newFilterRel = new 
DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(),
+          inputRel, origFilterCondition /* for now keep the original condition 
until we add more test coverage */);
+
+      call.transformTo(newFilterRel);
+    }
+  }
+
+  public static String truncatePrefixFromPath(String fileName) {
+    String pathPrefixComponent[] = fileName.split(":", 2);
+    if (pathPrefixComponent.length == 1) {
+      return pathPrefixComponent[0];
+    } else {
+      return pathPrefixComponent[1];
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java 
b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 0c75640..580a1f0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -193,7 +193,7 @@ public class PlanTestBase extends BaseTestQuery {
    * This will submit an "EXPLAIN" statement, and return the column value which
    * contains the plan's string.
    */
-  private String getPlanInString(String sql, String columnName)
+  protected String getPlanInString(String sql, String columnName)
       throws Exception {
     List<QueryResultBatch> results = testSqlWithResults(sql);
 

Reply via email to