DRILL-776: Support SelectionVector SV2 and SV4 in Partition Sender.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e9ac37db Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e9ac37db Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e9ac37db Branch: refs/heads/master Commit: e9ac37dbff8f0b606673bb5827c0daac1d3275b0 Parents: c40735e Author: Jinfeng Ni <[email protected]> Authored: Mon May 19 08:35:34 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon May 19 17:46:19 2014 -0700 ---------------------------------------------------------------------- .../PartitionSenderRootExec.java | 69 +++++++++++++++----- .../impl/partitionsender/Partitioner.java | 5 ++ .../partitionsender/PartitionerSV2Template.java | 60 +++++++++++++++++ .../partitionsender/PartitionerSV4Template.java | 61 +++++++++++++++++ .../physical/HashToRandomExchangePrel.java | 5 ++ .../org/apache/drill/TestExampleQueries.java | 10 +++ 6 files changed, 193 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index bcd484c..f574351 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -182,8 +182,24 @@ public class PartitionSenderRootExec implements RootExec { // set up partitioning function final LogicalExpression expr = operator.getExpr(); final ErrorCollector collector = new ErrorCollectorImpl(); - final ClassGenerator<Partitioner> cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, - context.getFunctionRegistry()); + final ClassGenerator<Partitioner> cg ; + + boolean hyper = false; + + switch(incoming.getSchema().getSelectionVectorMode()){ + case NONE: + cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + break; + case TWO_BYTE: + cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV2, context.getFunctionRegistry()); + break; + case FOUR_BYTE: + cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV4, context.getFunctionRegistry()); + hyper = true; + break; + default: + throw new UnsupportedOperationException(); + } final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); if (collector.hasErrors()) { @@ -255,22 +271,41 @@ public class PartitionSenderRootExec implements RootExec { Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(), vvIn.getField().getType().getMode()); JClass vvClass = cg.getModel().ref(vvType); - // the following block generates calls to copyFrom(); e.g.: - // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, - // outgoingBatches[bucket].getRecordCount(), - // vv1); - cg.getEvalBlock()._if( - ((JExpression) JExpr.cast(vvClass, - ((JExpression) - outgoingVectors - .component(bucket)) - .component(JExpr.lit(fieldId)))) - .invoke("copyFromSafe") - .arg(inIndex) - .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) - ._return(); + if (!hyper) { + // the following block generates calls to copyFrom(); e.g.: + // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, + // outgoingBatches[bucket].getRecordCount(), + // vv1); + cg.getEvalBlock()._if( + ((JExpression) JExpr.cast(vvClass, + ((JExpression) + outgoingVectors + .component(bucket)) + .component(JExpr.lit(fieldId)))) + .invoke("copyFromSafe") + .arg(inIndex) + .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) + .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) + ._return(); + } else { + // the following block generates calls to copyFrom(); e.g.: + // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, + // outgoingBatches[bucket].getRecordCount(), + // vv1[((inIndex)>>> 16)]); + cg.getEvalBlock()._if( + ((JExpression) JExpr.cast(vvClass, + ((JExpression) + outgoingVectors + .component(bucket)) + .component(JExpr.lit(fieldId)))) + .invoke("copyFromSafe") + .arg(inIndex) + .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) + .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) + ._return(); + + } ++fieldId; } // generate the OutgoingRecordBatch helper invocations http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 7d3998b..3ffead0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -31,4 +31,9 @@ public interface Partitioner { public abstract void partitionBatch(RecordBatch incoming); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); + + public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV2 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV2Template.class); + + public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV4 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV4Template.class); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java new file mode 100644 index 0000000..981055a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + +import javax.inject.Named; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; + +public abstract class PartitionerSV2Template implements Partitioner { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV2Template.class); + + private SelectionVector2 sv2; + + public PartitionerSV2Template() throws SchemaChangeException { + } + + @Override + public final void setup(FragmentContext context, + RecordBatch incoming, + OutgoingRecordBatch[] outgoing) throws SchemaChangeException { + + this.sv2 = incoming.getSelectionVector2(); + + doSetup(context, incoming, outgoing); + + } + + @Override + public void partitionBatch(RecordBatch incoming) { + + for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { + // for each record + doEval(sv2.getIndex(recordId), 0); + } + + } + + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; + public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java new file mode 100644 index 0000000..2e00f9b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + +import javax.inject.Named; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +public abstract class PartitionerSV4Template implements Partitioner { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV4Template.class); + + private SelectionVector4 sv4; + + public PartitionerSV4Template() throws SchemaChangeException { + } + + @Override + public final void setup(FragmentContext context, + RecordBatch incoming, + OutgoingRecordBatch[] outgoing) throws SchemaChangeException { + + this.sv4 = incoming.getSelectionVector4(); + + doSetup(context, incoming, outgoing); + + } + + @Override + public void partitionBatch(RecordBatch incoming) { + + for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { + // for each record + doEval(sv4.get(recordId), 0); + } + + } + + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; + public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java index d582684..9756a76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java @@ -115,5 +115,10 @@ public class HashToRandomExchangePrel extends SinglePrel { return SelectionVectorMode.NONE; } + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.ALL; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 83b43fb..1757290 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -49,6 +49,16 @@ public class TestExampleQueries extends BaseTestQuery{ } @Test + public void testHashPartitionSV2 () throws Exception{ + test("select count(n_nationkey) from cp.`tpch/nation.parquet` where n_nationkey > 8 group by n_regionkey"); + } + + @Test + public void testHashPartitionSV4 () throws Exception{ + test("select count(n_nationkey) as cnt from cp.`tpch/nation.parquet` group by n_regionkey order by cnt"); + } + + @Test public void testSelectWithLimit() throws Exception{ test("select employee_id, first_name, last_name from cp.`employee.json` limit 5 "); }
