DRILL-1543: Implement partition pruning for Hive tables
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b956e45e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b956e45e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b956e45e Branch: refs/heads/master Commit: b956e45e65cbbe347dbfdbeda5302ba7aeff6ed7 Parents: eff3764 Author: Mehant Baid <[email protected]> Authored: Fri Oct 17 19:22:01 2014 -0700 Committer: Mehant Baid <[email protected]> Committed: Thu Oct 23 14:13:35 2014 -0700 ---------------------------------------------------------------------- .../planner/sql/HivePartitionDescriptor.java | 56 ++++++++ .../HivePushPartitionFilterIntoScan.java | 142 +++++++++++++++++++ .../drill/exec/store/hive/HiveReadEntry.java | 13 ++ .../apache/drill/exec/store/hive/HiveScan.java | 13 +- .../exec/store/hive/HiveStoragePlugin.java | 8 +- .../drill/exec/TestHivePartitionPruning.java | 69 +++++++++ .../exec/store/hive/HiveTestDataGenerator.java | 13 ++ .../planner/FileSystemPartitionDescriptor.java | 48 +++++++ .../drill/exec/planner/PartitionDescriptor.java | 36 +++++ .../exec/planner/logical/DirPathBuilder.java | 29 ++-- .../DrillPushPartitionFilterIntoScan.java | 67 ++------- .../planner/logical/PartitionPruningUtil.java | 78 ++++++++++ .../java/org/apache/drill/PlanTestBase.java | 2 +- 13 files changed, 502 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java new file mode 100644 index 0000000..e6ca21e --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import org.apache.drill.exec.planner.PartitionDescriptor; +import org.apache.drill.exec.store.hive.HiveTable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// Partition descriptor for hive tables +public class HivePartitionDescriptor implements PartitionDescriptor { + + private final Map<String, Integer> partitionMap = new HashMap<>(); + private final int MAX_NESTED_SUBDIRS; + + public HivePartitionDescriptor(List<HiveTable.FieldSchemaWrapper> partitionName) { + int i = 0; + for (HiveTable.FieldSchemaWrapper wrapper : partitionName) { + partitionMap.put(wrapper.name, i); + i++; + } + MAX_NESTED_SUBDIRS = i; + } + + @Override + public int getPartitionHierarchyIndex(String partitionName) { + return partitionMap.get(partitionName); + } + + @Override + public boolean isPartitionName(String name) { + return (partitionMap.get(name) != null); + } + + @Override + public int getMaxHierarchyLevel() { + return MAX_NESTED_SUBDIRS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java new file mode 100644 index 0000000..374c486 --- /dev/null +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.sql.logical; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DirPathBuilder; +import org.apache.drill.exec.planner.logical.DrillFilterRel; +import org.apache.drill.exec.planner.logical.DrillProjectRel; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.PartitionPruningUtil; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.sql.HivePartitionDescriptor; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.hive.HiveReadEntry; +import org.apache.drill.exec.store.hive.HiveScan; +import org.apache.drill.exec.store.hive.HiveTable; +import org.apache.drill.exec.store.hive.HiveTable.HivePartition; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelOptRuleOperand; + +import com.google.common.collect.Lists; + +public abstract class HivePushPartitionFilterIntoScan extends StoragePluginOptimizerRule { + + public static final StoragePluginOptimizerRule HIVE_FILTER_ON_PROJECT = + new HivePushPartitionFilterIntoScan( + RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), + "HivePushPartitionFilterIntoScan:Filter_On_Project") { + + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(2); + GroupScan groupScan = scan.getGroupScan(); + return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); + final DrillScanRel scanRel = (DrillScanRel) call.rel(2); + doOnMatch(call, filterRel, projectRel, scanRel); + } + }; + + public static final StoragePluginOptimizerRule HIVE_FILTER_ON_SCAN = + new HivePushPartitionFilterIntoScan( + RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), + "HivePushPartitionFilterIntoScan:Filter_On_Scan") { + + @Override + public boolean matches(RelOptRuleCall call) { + final DrillScanRel scan = (DrillScanRel) call.rel(1); + GroupScan groupScan = scan.getGroupScan(); + return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); + final DrillScanRel scanRel = (DrillScanRel) call.rel(1); + doOnMatch(call, filterRel, null, scanRel); + } + }; + + private HivePushPartitionFilterIntoScan( + RelOptRuleOperand operand, + String id) { + super(operand, id); + } + + private HiveReadEntry splitFilter(HiveReadEntry origReadEntry, DirPathBuilder builder) { + HiveTable table = origReadEntry.table; + List<HivePartition> partitions = origReadEntry.partitions; + List<HivePartition> newPartitions = new LinkedList<>(); + String pathPrefix = PartitionPruningUtil.truncatePrefixFromPath(table.getTable().getSd().getLocation()); + List<String> newFiles = Lists.newArrayList(); + List<String> dirPathList = builder.getDirPath(); + + for (String dirPath : dirPathList) { + String fullPath = pathPrefix + dirPath; + // check containment of this path in the list of files + for (HivePartition part: partitions) { + String origFilePath = origReadEntry.getPartitionLocation(part); + String origFileName = PartitionPruningUtil.truncatePrefixFromPath(origFilePath); + + if (origFileName.startsWith(fullPath)) { + newFiles.add(origFileName); + newPartitions.add(part); + } + } + } + + if (newFiles.size() > 0) { + HiveReadEntry newReadEntry = new HiveReadEntry(table, newPartitions, origReadEntry.hiveConfigOverride); + return newReadEntry; + } + return origReadEntry; + } + + protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) { + DrillRel inputRel = projectRel != null ? projectRel : scanRel; + HiveReadEntry origReadEntry = ((HiveScan)scanRel.getGroupScan()).hiveReadEntry; + DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), new HivePartitionDescriptor(origReadEntry.table.partitionKeys)); + HiveReadEntry newReadEntry = splitFilter(origReadEntry, builder); + + if (origReadEntry == newReadEntry) { + return; // no directory filter was pushed down + } + + try { + HiveScan oldScan = (HiveScan) scanRel.getGroupScan(); + HiveScan hiveScan = new HiveScan(newReadEntry, oldScan.storagePlugin, oldScan.columns); + PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, hiveScan, builder); + } catch (ExecutionSetupException e) { + throw new DrillRuntimeException(e); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java index 13845ae..70f8a5b 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java @@ -17,11 +17,14 @@ */ package org.apache.drill.exec.store.hive; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import net.hydromatic.optiq.Schema.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -74,5 +77,15 @@ public class HiveReadEntry { return TableType.TABLE; } + + public String getPartitionLocation(HiveTable.HivePartition partition) { + String partitionPath = table.getTable().getSd().getLocation(); + + for (String value: partition.values) { + partitionPath += "/" + value; + } + + return partitionPath; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 303fc3c..d1cc09b 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.store.hive.HiveTable.HivePartition; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapred.FileInputFormat; @@ -109,6 +110,7 @@ public class HiveScan extends AbstractGroupScan { this.hiveReadEntry = hiveReadEntry; this.columns = columns; this.partitions = hiveReadEntry.getPartitions(); + this.storagePlugin = storagePlugin; getSplits(); endpoints = storagePlugin.getContext().getBits(); this.storagePluginName = storagePlugin.getName(); @@ -305,7 +307,8 @@ public class HiveScan extends AbstractGroupScan { public String toString() { return "HiveScan [table=" + table + ", inputSplits=" + inputSplits - + ", columns=" + columns + "]"; + + ", columns=" + columns + + ", partitions= " + partitions +"]"; } @Override @@ -320,4 +323,12 @@ public class HiveScan extends AbstractGroupScan { return true; } + // Return true if the current table is partitioned false otherwise + public boolean supportsPartitionFilterPushdown() { + List<FieldSchema> partitionKeys = table.getPartitionKeys(); + if (partitionKeys == null || partitionKeys.size() == 0) { + return false; + } + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index e221707..a3d4583 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -19,6 +19,8 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; import java.util.List; +import java.util.Set; +import com.google.common.collect.ImmutableSet; import net.hydromatic.optiq.Schema.TableType; import net.hydromatic.optiq.SchemaPlus; @@ -26,9 +28,11 @@ import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory; import com.fasterxml.jackson.core.type.TypeReference; @@ -80,6 +84,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { public void registerSchemas(UserSession session, SchemaPlus parent) { schemaFactory.registerSchemas(session, parent); } - + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java new file mode 100644 index 0000000..8d42fca --- /dev/null +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHivePartitionPruning.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.store.hive.HiveTestDataGenerator; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +public class TestHivePartitionPruning extends PlanTestBase { + + @BeforeClass + public static void generateHive() throws Exception{ + new HiveTestDataGenerator().createAndAddHiveTestPlugin(bit.getContext().getStorage()); + } + + //Currently we do not have a good way to test plans so using a crude string comparison + @Test + public void testSimplePartitionFilter() throws Exception { + String query = "explain plan for select * from hive.`default`.partition_pruning_test where c = 1"; + String plan = getPlanInString(query, OPTIQ_FORMAT); + + // Check and make sure that Filter is not present in the plan + assert plan.contains("Filter") == false; + } + + @Test + public void testDisjunctsPartitionFilter() throws Exception { + String query = "explain plan for select * from hive.`default`.partition_pruning_test where (c = 1) or (d = 1)"; + String plan = getPlanInString(query, OPTIQ_FORMAT); + + // Check and make sure that Filter is not present in the plan + assert plan.contains("Filter") == false; + } + + @Test + public void testConjunctsPartitionFilter() throws Exception { + String query = "explain plan for select * from hive.`default`.partition_pruning_test where c = 1 and d = 1"; + String plan = getPlanInString(query, OPTIQ_FORMAT); + + // Check and make sure that Filter is not present in the plan + assert plan.contains("Filter") == false; + } + + @Ignore("DRILL-1571") + public void testComplexFilter() throws Exception { + String query = "explain plan for select * from hive.`default`.partition_pruning_test where (c = 1 and d = 1) or (c = 2 and d = 3)"; + String plan = getPlanInString(query, OPTIQ_FORMAT); + + // Check and make sure that Filter is not present in the plan + assert plan.contains("Filter") == false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 7b2c9b6..d339d28 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -206,6 +206,19 @@ public class HiveTestDataGenerator { // create a Hive view to test how its metadata is populated in Drill's INFORMATION_SCHEMA executeQuery("CREATE VIEW IF NOT EXISTS hiveview AS SELECT * FROM kv"); + // create partitioned hive table to test partition pruning + executeQuery("USE default"); + executeQuery("CREATE TABLE IF NOT EXISTS default.partition_pruning_test(a DATE, b TIMESTAMP) "+ + "partitioned by (c int, d int, e int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=1, d=1, e=1)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=1, d=1, e=2)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=1, d=2, e=1)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=1, d=1, e=2)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=2, d=1, e=1)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=2, d=1, e=2)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=2, d=3, e=1)", testDateDataFile)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' INTO TABLE default.partition_pruning_test partition(c=2, d=3, e=2)", testDateDataFile)); + ss.close(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java new file mode 100644 index 0000000..4c1f8e8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner; + +// partition descriptor for file system based tables +public class FileSystemPartitionDescriptor implements PartitionDescriptor { + + static final int MAX_NESTED_SUBDIRS = 10; // allow up to 10 nested sub-directories + + private final String partitionLabel; + private final int partitionLabelLength; + + public FileSystemPartitionDescriptor(String partitionLabel) { + this.partitionLabel = partitionLabel; + this.partitionLabelLength = partitionLabel.length(); + } + + @Override + public int getPartitionHierarchyIndex(String partitionName) { + String suffix = partitionName.substring(partitionLabelLength); // get the numeric suffix from 'dir<N>' + return Integer.parseInt(suffix); + } + + @Override + public boolean isPartitionName(String name) { + return name.matches(partitionLabel +"[0-9]"); + } + + @Override + public int getMaxHierarchyLevel() { + return MAX_NESTED_SUBDIRS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java new file mode 100644 index 0000000..02a6a8f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner; + +// Interface used to describe partitions. Currently used by file system based partitions and hive partitions +public interface PartitionDescriptor { + + /* Get the hierarchy index of the given partition + * For eg: if we have the partition laid out as follows + * 1997/q1/jan + * + * then getPartitionHierarchyIndex("jan") => 2 + */ + public int getPartitionHierarchyIndex(String partitionName); + + // Given a column name return boolean to indicate if its a partition column or not + public boolean isPartitionName(String name); + + // Maximum level of partition nesting/ hierarchy supported + public int getMaxHierarchyLevel(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java index eeb664d..7972d74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DirPathBuilder.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.planner.PartitionDescriptor; import org.eigenbase.relopt.RelOptUtil; import org.eigenbase.reltype.RelDataTypeField; import org.eigenbase.rex.RexBuilder; @@ -44,32 +45,34 @@ import com.google.common.collect.Lists; public class DirPathBuilder extends RexVisitorImpl <SchemaPath> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirPathBuilder.class); - static final int MAX_NESTED_SUBDIRS = 10; // allow up to 10 nested sub-directories static final String EMPTY_STRING = ""; final private DrillFilterRel filterRel; final private DrillRel inputRel; final private RexBuilder builder; - final private String dirLabel; + final private PartitionDescriptor partitionDescriptor; - private List<String> dirNameList = Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS); - private List<RexNode> conjunctList = Lists.newArrayListWithExpectedSize(MAX_NESTED_SUBDIRS); + private List<String> dirNameList; + private List<RexNode> conjunctList; private List<String> dirPathList = Lists.newArrayList(); private RexNode currentConjunct = null; // the current conjunct are we evaluating during visitor traversal private RexNode finalCondition = null; // placeholder for the final filter condition private boolean dirMatch = false; - DirPathBuilder(DrillFilterRel filterRel, DrillRel inputRel, RexBuilder builder, String dirLabel) { + public DirPathBuilder(DrillFilterRel filterRel, DrillRel inputRel, RexBuilder builder, PartitionDescriptor partitionDescriptor) { super(true); this.filterRel = filterRel; this.inputRel = inputRel; this.builder = builder; - this.dirLabel = dirLabel; this.finalCondition = filterRel.getCondition(); + this.partitionDescriptor = partitionDescriptor; } private void initPathComponents() { - for (int i=0; i < MAX_NESTED_SUBDIRS; i++) { + int maxHierarchy = partitionDescriptor.getMaxHierarchyLevel(); + dirNameList = Lists.newArrayListWithExpectedSize(maxHierarchy); + conjunctList = Lists.newArrayListWithExpectedSize(maxHierarchy); + for (int i=0; i < maxHierarchy; i++) { dirNameList.add(EMPTY_STRING); conjunctList.add(null); } @@ -153,7 +156,7 @@ public class DirPathBuilder extends RexVisitorImpl <SchemaPath> { public SchemaPath visitInputRef(RexInputRef inputRef) { final int index = inputRef.getIndex(); final RelDataTypeField field = inputRel.getRowType().getFieldList().get(index); - if (field.getName().matches(dirLabel+"[0-9]")) { + if (partitionDescriptor.isPartitionName(field.getName())) { dirMatch = true; } return FieldReference.getWithQuotedRef(field.getName()); @@ -175,19 +178,18 @@ public class DirPathBuilder extends RexVisitorImpl <SchemaPath> { if (dirMatch && e1 != null) { // get the index for the 'dir<N>' filter String dirName = e1.getRootSegment().getPath(); - String suffix = dirName.substring(dirLabel.length()); // get the numeric suffix from 'dir<N>' - int suffixIndex = Integer.parseInt(suffix); + int hierarychyIndex = partitionDescriptor.getPartitionHierarchyIndex(dirName); - if (suffixIndex >= MAX_NESTED_SUBDIRS) { + if (hierarychyIndex >= partitionDescriptor.getMaxHierarchyLevel()) { return null; } // SchemaPath e2 = call.getOperands().get(1).accept(this); if (call.getOperands().get(1).getKind() == SqlKind.LITERAL) { String e2 = ((RexLiteral)call.getOperands().get(1)).getValue2().toString(); - dirNameList.set(suffixIndex, e2); + dirNameList.set(hierarychyIndex, e2); // dirNameList.set(suffixIndex, e2.getRootSegment().getPath()); - conjunctList.set(suffixIndex, currentConjunct); + conjunctList.set(hierarychyIndex, currentConjunct); return e1; } } @@ -216,7 +218,6 @@ public class DirPathBuilder extends RexVisitorImpl <SchemaPath> { if (dirMatch) { return arg; } - return null; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java index 19a92f0..c6dceda 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushPartitionFilterIntoScan.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.physical.base.FileGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.FileSystemPartitionDescriptor; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.store.dfs.FileSelection; @@ -44,7 +46,9 @@ public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { @Override public boolean matches(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(2); - return scan.getGroupScan().supportsPartitionFilterPushdown(); + GroupScan groupScan = scan.getGroupScan(); + // this rule is applicable only for dfs based partition pruning + return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); } @Override @@ -64,7 +68,9 @@ public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { @Override public boolean matches(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(1); - return scan.getGroupScan().supportsPartitionFilterPushdown(); + GroupScan groupScan = scan.getGroupScan(); + // this rule is applicable only for dfs based partition pruning + return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); } @Override @@ -94,13 +100,8 @@ public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { String fullPath = pathPrefix + dirPath; // check containment of this path in the list of files for (String origFilePath : origFiles) { - String[] components = origFilePath.split(":", 2); // some paths are of the form 'file:<path>', so we need to split - String origFileName = ""; - if (components.length == 1) { - origFileName = components[0]; - } else { - origFileName = components[1]; - } + String origFileName = PartitionPruningUtil.truncatePrefixFromPath(origFilePath); + if (origFileName.startsWith(fullPath)) { newFiles.add(origFileName); } @@ -120,7 +121,7 @@ public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { DrillRel inputRel = projectRel != null ? projectRel : scanRel; PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), settings.getFsPartitionColumnLabel()); + DirPathBuilder builder = new DirPathBuilder(filterRel, inputRel, filterRel.getCluster().getRexBuilder(), new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel())); FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection(); FormatSelection newSelection = splitFilter(origSelection, builder); @@ -129,55 +130,11 @@ public abstract class DrillPushPartitionFilterIntoScan extends RelOptRule { return; // no directory filter was pushed down } - RexNode origFilterCondition = filterRel.getCondition(); - RexNode newFilterCondition = builder.getFinalCondition(); - try { FileGroupScan fgscan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection.getSelection()); - - if (newFilterCondition.isAlwaysTrue()) { - - final DrillScanRel newScanRel = - new DrillScanRel(scanRel.getCluster(), - scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), - fgscan, - scanRel.getRowType(), - scanRel.getColumns()); - - if (projectRel != null) { - DrillProjectRel newProjectRel = new DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(), - newScanRel, projectRel.getProjects(), filterRel.getRowType()); - - call.transformTo(newProjectRel); - } else { - call.transformTo(newScanRel); - } - } else { - - final DrillScanRel newScanRel = - new DrillScanRel(scanRel.getCluster(), - scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), - fgscan, - scanRel.getRowType(), - scanRel.getColumns()); - if (projectRel != null) { - DrillProjectRel newProjectRel = new DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(), - newScanRel, projectRel.getProjects(), projectRel.getRowType()); - inputRel = newProjectRel; - } else { - inputRel = newScanRel; - } - final DrillFilterRel newFilterRel = new DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(), - inputRel, origFilterCondition /* for now keep the original condition until we add more test coverage */); - - call.transformTo(newFilterRel); - } + PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, fgscan, builder); } catch (IOException e) { throw new DrillRuntimeException(e) ; } - } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java new file mode 100644 index 0000000..88b4a51 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PartitionPruningUtil.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.drill.exec.physical.base.GroupScan; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.rex.RexNode; + +public class PartitionPruningUtil { + public static void rewritePlan(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel, GroupScan newScan, DirPathBuilder builder) { + RexNode origFilterCondition = filterRel.getCondition(); + RexNode newFilterCondition = builder.getFinalCondition(); + + if (newFilterCondition.isAlwaysTrue()) { + + final DrillScanRel newScanRel = + new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + newScan, + scanRel.getRowType(), + scanRel.getColumns()); + + if (projectRel != null) { + DrillProjectRel newProjectRel = new DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(), + newScanRel, projectRel.getProjects(), filterRel.getRowType()); + + call.transformTo(newProjectRel); + } else { + call.transformTo(newScanRel); + } + } else { + DrillRel inputRel; + final DrillScanRel newScanRel = + new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + newScan, + scanRel.getRowType(), + scanRel.getColumns()); + if (projectRel != null) { + DrillProjectRel newProjectRel = new DrillProjectRel(projectRel.getCluster(), projectRel.getTraitSet(), + newScanRel, projectRel.getProjects(), projectRel.getRowType()); + inputRel = newProjectRel; + } else { + inputRel = newScanRel; + } + final DrillFilterRel newFilterRel = new DrillFilterRel(filterRel.getCluster(), filterRel.getTraitSet(), + inputRel, origFilterCondition /* for now keep the original condition until we add more test coverage */); + + call.transformTo(newFilterRel); + } + } + + public static String truncatePrefixFromPath(String fileName) { + String pathPrefixComponent[] = fileName.split(":", 2); + if (pathPrefixComponent.length == 1) { + return pathPrefixComponent[0]; + } else { + return pathPrefixComponent[1]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b956e45e/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java index 0c75640..580a1f0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -193,7 +193,7 @@ public class PlanTestBase extends BaseTestQuery { * This will submit an "EXPLAIN" statement, and return the column value which * contains the plan's string. */ - private String getPlanInString(String sql, String columnName) + protected String getPlanInString(String sql, String columnName) throws Exception { List<QueryResultBatch> results = testSqlWithResults(sql);
