Fix the duplicate field names in join operator. Work in progress for column star.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d91a01a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d91a01a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d91a01a1 Branch: refs/heads/master Commit: d91a01a122fa91bed06cafa4a53aaf99770e8423 Parents: 31d1994 Author: Jinfeng Ni <[email protected]> Authored: Wed Apr 2 12:12:13 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Apr 19 18:07:10 2014 -0700 ---------------------------------------------------------------------- .../physical/config/SelectionVectorRemover.java | 7 +- .../exec/planner/physical/MergeJoinPrel.java | 73 +++++++++++++++----- .../planner/types/RelDataTypeDrillImpl.java | 3 +- .../exec/planner/types/RelDataTypeHolder.java | 16 +++-- 4 files changed, 76 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d91a01a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java index 7361503..5e891ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -56,5 +57,9 @@ public class SelectionVectorRemover extends AbstractSingle { public Size getSize() { return new Size( (long) (child.getSize().getRecordCount()), child.getSize().getRecordSize()); } - + + @Override + public SelectionVectorMode getSVMode() { + return SelectionVectorMode.NONE; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d91a01a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java index f3b893c..bfb2192 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java @@ -18,15 +18,14 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; -import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.planner.common.DrillJoinRelBase; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -59,7 +58,6 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel { if (!remaining.isAlwaysTrue()) { throw new InvalidRelException("MergeJoinPrel only supports equi-join"); } - //this.joinConditions = joinConditions; } @@ -74,16 +72,21 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - PhysicalOperator leftPop = ((Prel) getLeft()).getPhysicalOperator(creator); + final List<String> fields = getRowType().getFieldNames(); + assert isUnique(fields); + final int leftCount = left.getRowType().getFieldCount(); + final List<String> leftFields = fields.subList(0, leftCount); + final List<String> rightFields = fields.subList(leftCount, fields.size()); + PhysicalOperator leftPop = implementInput(creator, 0, left); + PhysicalOperator rightPop = implementInput(creator, leftCount, right); + //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover if (leftPop.getSVMode().equals(SelectionVectorMode.FOUR_BYTE)) { leftPop = new SelectionVectorRemover(leftPop); creator.addPhysicalOperator(leftPop); } - PhysicalOperator rightPop = ((Prel) getRight()).getPhysicalOperator(creator); - //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover if (rightPop.getSVMode().equals(SelectionVectorMode.FOUR_BYTE)) { rightPop = new SelectionVectorRemover(rightPop); @@ -91,18 +94,13 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel { } JoinRelType jtype = this.getJoinType(); - - final List<String> fields = getRowType().getFieldNames(); - assert isUnique(fields); - final int leftCount = left.getRowType().getFieldCount(); - final List<String> leftFields = fields.subList(0, leftCount); - final List<String> rightFields = fields.subList(leftCount, fields.size()); - + List<JoinCondition> conditions = Lists.newArrayList(); for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) { conditions.add(new JoinCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)))); } + MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype); creator.addPhysicalOperator(mjoin); @@ -117,7 +115,48 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel { return this.rightKeys; } -// public JoinCondition[] getJoinConditions() { -// return joinConditions; -// } + /** + * Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them. + * @param implementor + * @param i + * @param offset + * @param input + * @return + */ + private PhysicalOperator implementInput(PhysicalPlanCreator creator, int offset, RelNode input) throws IOException { + final PhysicalOperator inputOp = ((Prel) input).getPhysicalOperator(creator); + assert uniqueFieldNames(input.getRowType()); + final List<String> fields = getRowType().getFieldNames(); + final List<String> inputFields = input.getRowType().getFieldNames(); + final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); + if (!outputFields.equals(inputFields)) { + // Ensure that input field names are the same as output field names. + // If there are duplicate field names on left and right, fields will get + // lost. + return rename(creator, inputOp, inputFields, outputFields); + } else { + return inputOp; + } + } + + private PhysicalOperator rename(PhysicalPlanCreator creator, PhysicalOperator inputOp, List<String> inputFields, List<String> outputFields) { + List<NamedExpression> exprs = Lists.newArrayList(); + + //Currently, Project only accepts "NONE". For other, requires SelectionVectorRemover + if (!inputOp.getSVMode().equals(SelectionVectorMode.NONE)) { + inputOp = new SelectionVectorRemover(inputOp); + creator.addPhysicalOperator(inputOp); + } + + for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) { + exprs.add(new NamedExpression(new FieldReference(pair.left), new FieldReference("output." + pair.right))); + } + + Project proj = new Project(exprs, inputOp); + + creator.addPhysicalOperator(proj); + return proj; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d91a01a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeDrillImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeDrillImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeDrillImpl.java index 8b031ec..0f3c24f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeDrillImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeDrillImpl.java @@ -41,6 +41,7 @@ public class RelDataTypeDrillImpl extends RelDataTypeImpl { public RelDataTypeDrillImpl(RelDataTypeHolder holder, RelDataTypeFactory typeFactory) { this.typeFactory = typeFactory; this.holder = holder; + this.holder.setRelDataTypeFactory(typeFactory); computeDigest(); } @@ -76,7 +77,7 @@ public class RelDataTypeDrillImpl extends RelDataTypeImpl { @Override protected void generateTypeString(StringBuilder sb, boolean withDetail) { - sb.append("DrillRecordRow"); + sb.append("(DrillRecordRow" + getFieldNames() + ")"); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d91a01a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeHolder.java index 939e9ac..8515b0c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/RelDataTypeHolder.java @@ -31,19 +31,22 @@ public class RelDataTypeHolder { List<RelDataTypeField> fields = Lists.newArrayList(); + private RelDataTypeFactory typeFactory; + public List<RelDataTypeField> getFieldList(RelDataTypeFactory typeFactory) { - addStarIfEmpty(); + addStarIfEmpty(typeFactory); return fields; } public int getFieldCount() { - addStarIfEmpty(); + addStarIfEmpty(this.typeFactory); return fields.size(); } - private void addStarIfEmpty(){ - //if (fieldNames.isEmpty()) fieldNames.add("*"); + private void addStarIfEmpty(RelDataTypeFactory typeFactory){ +// RelDataTypeField starCol = getField(typeFactory, "*"); +// if (fields.isEmpty()) fields.add(starCol); } public RelDataTypeField getField(RelDataTypeFactory typeFactory, String fieldName) { @@ -72,4 +75,9 @@ public class RelDataTypeHolder { return fieldNames; } + + public void setRelDataTypeFactory(RelDataTypeFactory typeFactory) { + this.typeFactory = typeFactory; + } + }
