DRILL-3381: Add option to distribute on the partition keys when doing doing CTAS with partitioning
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7c22366 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7c22366 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7c22366 Branch: refs/heads/master Commit: c7c2236623f8cd37d367b4dcc0c001f08bf044e5 Parents: c6978a5 Author: Steven Phillips <[email protected]> Authored: Thu Jun 25 17:26:14 2015 -0700 Committer: Steven Phillips <[email protected]> Committed: Thu Jun 25 17:55:53 2015 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 + .../exec/planner/physical/WriterPrule.java | 20 ++++++- .../server/options/SystemOptionManager.java | 1 + .../apache/drill/TestCTASPartitionFilter.java | 61 ++++++++++++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 8ea90e0..140e9a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -251,4 +251,7 @@ public interface ExecConstants { public static final String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old"; public static final OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR, false); + + public static final String CTAS_PARTITIONING_HASH_DISTRIBUTE = "store.partition.hash_distribute"; + public static final BooleanValidator CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR = new BooleanValidator(CTAS_PARTITIONING_HASH_DISTRIBUTE, false); } http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java index e191423..d4e3d0e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java @@ -18,11 +18,13 @@ package org.apache.drill.exec.planner.physical; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationImpl; import org.apache.calcite.rel.RelFieldCollation; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.common.DrillWriterRelBase; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillWriterRel; @@ -31,6 +33,8 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelTraitSet; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; +import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType; import java.util.List; @@ -47,8 +51,13 @@ public class WriterPrule extends Prule{ final DrillWriterRel writer = call.rel(0); final RelNode input = call.rel(1); - final RelCollation collation = getCollation(writer.getPartitionKeys()); - final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation); + final List<Integer> keys = writer.getPartitionKeys(); + final RelCollation collation = getCollation(keys); + final boolean hashDistribute = PrelUtil.getPlannerSettings(call.getPlanner()).getOptions().getOption(ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR); + final RelTraitSet traits = hashDistribute ? + input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(getDistribution(keys)) : + input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation); + final RelNode convertedInput = convert(input, traits); if (!new WriteTraitPull(call).go(writer, convertedInput)) { @@ -67,6 +76,13 @@ public class WriterPrule extends Prule{ return RelCollationImpl.of(fields); } + private DrillDistributionTrait getDistribution(List<Integer> keys) { + List<DistributionField> fields = Lists.newArrayList(); + for (int key : keys) { + fields.add(new DistributionField(key)); + } + return new DrillDistributionTrait(DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(fields)); + } private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException> { http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index abd2212..2d41740 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -107,6 +107,7 @@ public class SystemOptionManager extends BaseOptionManager { ExecConstants.AVERAGE_FIELD_WIDTH, ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR, ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR, + ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR, QueryClassLoader.JAVA_COMPILER_VALIDATOR, QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE, QueryClassLoader.JAVA_COMPILER_DEBUG, http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java new file mode 100644 index 0000000..3943426 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + + +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.common.util.TestTools; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestCTASPartitionFilter extends PlanTestBase { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestCTASPartitionFilter.class); + + static final String WORKING_PATH = TestTools.getWorkingPath(); + static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; + + private static void testExcludeFilter(String query, int expectedNumFiles, + String excludedFilterPattern, int expectedRowCount) throws Exception { + int actualRowCount = testSql(query); + assertEquals(expectedRowCount, actualRowCount); + String numFilesPattern = "numFiles=" + expectedNumFiles; + testPlanMatchingPatterns(query, new String[]{numFilesPattern}, new String[]{excludedFilterPattern}); + } + + @Test + public void withDistribution() throws Exception { + test("alter session set `planner.slice_target` = 1"); + test("alter session set `store.partition.hash_distribute` = true"); + test("use dfs_test.tmp"); + test(String.format("create table orders_distribution partition by (o_orderpriority) as select * from dfs_test.`%s/multilevel/parquet`", TEST_RES_PATH)); + String query = "select * from orders_distribution where o_orderpriority = '1-URGENT'"; + testExcludeFilter(query, 1, "Filter", 24); + } + + @Test + public void withoutDistribution() throws Exception { + test("alter session set `planner.slice_target` = 1"); + test("alter session set `store.partition.hash_distribute` = false"); + test("use dfs_test.tmp"); + test(String.format("create table orders_no_distribution partition by (o_orderpriority) as select * from dfs_test.`%s/multilevel/parquet`", TEST_RES_PATH)); + String query = "select * from orders_no_distribution where o_orderpriority = '1-URGENT'"; + testExcludeFilter(query, 2, "Filter", 24); + } +} \ No newline at end of file
