Yingyi Bu has submitted this change and it was merged. Change subject: ASTERIXDB-221: reduce unneceesary partitioning for hash joins. ......................................................................
ASTERIXDB-221: reduce unneceesary partitioning for hash joins. For a hash join, start top-down data property optimization from a partitioning-compatiable child, and hence the other child's partitioning requirement could be updated. Change-Id: I835ea712c2f427149d45464fcb3841b8d33f6507 Reviewed-on: https://asterix-gerrit.ics.uci.edu/395 Tested-by: Jenkins <[email protected]> Reviewed-by: Wenhai Li <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java 3 files changed, 65 insertions(+), 21 deletions(-) Approvals: Wenhai Li: Looks good to me, but someone else must approve Yingyi Bu: Looks good to me, approved Jenkins: Verified diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java index 1091e1a..f5ea5f1 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java @@ -138,11 +138,16 @@ Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>(); Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op); Set<LogicalVariable> covered = new ListSet<LogicalVariable>(); + Set<LogicalVariable> keysCurrent = uppreq.getColumnSet(); + List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch + : keysLeftBranch; + List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch + : keysRightBranch; for (LogicalVariable r : uppreq.getColumnSet()) { EquivalenceClass ecSnd = eqmap.get(r); boolean found = false; int j = 0; - for (LogicalVariable rvar : keysRightBranch) { + for (LogicalVariable rvar : keysFirst) { if (rvar == r || ecSnd != null && eqmap.get(rvar) == ecSnd) { found = true; break; @@ -151,9 +156,9 @@ } if (!found) { throw new IllegalStateException("Did not find a variable equivalent to " - + r + " among " + keysRightBranch); + + r + " among " + keysFirst); } - LogicalVariable v2 = keysLeftBranch.get(j); + LogicalVariable v2 = keysSecond.get(j); EquivalenceClass ecFst = eqmap.get(v2); for (LogicalVariable vset1 : set1) { if (vset1 == v2 || ecFst != null && eqmap.get(vset1) == ecFst) { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java index af40f67..ae9f4f1 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java @@ -132,11 +132,10 @@ case UNORDERED_PARTITIONED: { UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd; UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd; - if (mayExpandProperties) { - return isPrefixOf(ud.getColumnSet().iterator(), ur.getColumnSet().iterator()); - } else { - return ur.getColumnSet().equals(ud.getColumnSet()); - } + if (mayExpandProperties) + return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet())); + else + return (ud.getColumnSet().equals(ur.getColumnSet())); } case ORDERED_PARTITIONED: { UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd; diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 4df9db7..2181efa 100644 --- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -27,7 +27,6 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -154,6 +153,43 @@ return changed; } + // Gets the index of a child to start top-down data property enforcement. + // If there is a partitioning-compatible child with the operator in opRef, start from this child; + // otherwise, start from child zero. + private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan, + IOptimizationContext context) throws AlgebricksException { + IPhysicalPropertiesVector[] reqdProperties = null; + if (pr != null) { + reqdProperties = pr.getRequiredProperties(); + } + + List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<IPartitioningProperty>(); + for (Mutable<ILogicalOperator> childRef : op.getInputs()) { + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + deliveredPartitioningPropertiesFromChildren.add(child.getDeliveredPhysicalProperties() + .getPartitioningProperty()); + } + int partitioningCompatibleChild = 0; + for (int i = 0; i < op.getInputs().size(); i++) { + IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i); + if (reqdProperties == null + || reqdProperties[i] == null + || reqdProperties[i].getPartitioningProperty() == null + || deliveredPropertyFromChild == null + || reqdProperties[i].getPartitioningProperty().getPartitioningType() != deliveredPartitioningPropertiesFromChildren + .get(i).getPartitioningType()) { + continue; + } + IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty(); + // If child i's delivered partitioning property already satisfies the required property, stop and return the child index. + if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) { + partitioningCompatibleChild = i; + break; + } + } + return partitioningCompatibleChild; + } + private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required, boolean nestedPlan, IOptimizationContext context) throws AlgebricksException { @@ -201,23 +237,31 @@ } } + // The child index of the child operator to optimize first. + int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context); IPartitioningProperty firstDeliveredPartitioning = null; - int i = 0; - for (Mutable<ILogicalOperator> childRef : op.getInputs()) { - AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + // Enforce data properties in a top-down manner. + for (int j = 0; j < op.getInputs().size(); j++) { + // Starts from a partitioning-compatible child if any to loop over all children. + int childIndex = (j + startChildIndex) % op.getInputs().size(); + IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex]; + AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue(); IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties(); AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator() + ": " + delivered + "\n"); IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator(); + // Coordinates requirements by looking at the firstDeliveredPartitioning. Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements( - reqdProperties[i].getPartitioningProperty(), firstDeliveredPartitioning, op, context); + requiredProperty.getPartitioningProperty(), firstDeliveredPartitioning, op, context); boolean mayExpandPartitioningProperties = pbpp.first; IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second, - reqdProperties[i].getLocalProperties()); + requiredProperty.getLocalProperties()); AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator() + ": " + rqd + "\n"); + // The partitioning property of reqdProperties[childIndex] could be updated here because + // rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty(). IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd, mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child)); @@ -227,9 +271,9 @@ if (diff != null) { changed = true; - addEnforcers(op, i, diff, rqd, delivered, childrenDomain, nestedPlan, context); + addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context); - AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(i).getValue()); + AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex).getValue()); if (newChild != child) { delivered = newChild.getDeliveredPhysicalProperties(); @@ -242,8 +286,8 @@ break; } } - } + if (firstDeliveredPartitioning == null) { IPartitioningProperty dpp = delivered.getPartitioningProperty(); if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED @@ -251,8 +295,6 @@ firstDeliveredPartitioning = dpp; } } - - i++; } if (op.hasNestedPlans()) { @@ -279,7 +321,6 @@ // Now, transfer annotations from the original sort op. to this one. AbstractLogicalOperator transferTo = nextOp; if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) { - // // remove duplicate exchange operator transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue(); } @@ -598,7 +639,6 @@ } return ordCols; } - } private void setNewOp(Mutable<ILogicalOperator> opRef, AbstractLogicalOperator newOp, IOptimizationContext context) -- To view, visit https://asterix-gerrit.ics.uci.edu/395 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I835ea712c2f427149d45464fcb3841b8d33f6507 Gerrit-PatchSet: 6 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Wenhai Li <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Preston Carman <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Wenhai Li <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
