Yingyi Bu has submitted this change and it was merged.

Change subject: ASTERIXDB-221: reduce unneceesary partitioning for hash joins.
......................................................................


ASTERIXDB-221: reduce unneceesary partitioning for hash joins.

For a hash join, start top-down data property optimization from
a partitioning-compatiable child, and hence the other child's
partitioning requirement could be updated.

Change-Id: I835ea712c2f427149d45464fcb3841b8d33f6507
Reviewed-on: https://asterix-gerrit.ics.uci.edu/395
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wenhai Li <[email protected]>
Reviewed-by: Yingyi Bu <[email protected]>
---
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
M 
algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
3 files changed, 65 insertions(+), 21 deletions(-)

Approvals:
  Wenhai Li: Looks good to me, but someone else must approve
  Yingyi Bu: Looks good to me, approved
  Jenkins: Verified



diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 1091e1a..f5ea5f1 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -138,11 +138,16 @@
                                     Set<LogicalVariable> modifuppreq = new 
ListSet<LogicalVariable>();
                                     Map<LogicalVariable, EquivalenceClass> 
eqmap = context.getEquivalenceClassMap(op);
                                     Set<LogicalVariable> covered = new 
ListSet<LogicalVariable>();
+                                    Set<LogicalVariable> keysCurrent = 
uppreq.getColumnSet();
+                                    List<LogicalVariable> keysFirst = 
(keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch
+                                            : keysLeftBranch;
+                                    List<LogicalVariable> keysSecond = 
keysFirst == keysRightBranch ? keysLeftBranch
+                                            : keysRightBranch;
                                     for (LogicalVariable r : 
uppreq.getColumnSet()) {
                                         EquivalenceClass ecSnd = eqmap.get(r);
                                         boolean found = false;
                                         int j = 0;
-                                        for (LogicalVariable rvar : 
keysRightBranch) {
+                                        for (LogicalVariable rvar : keysFirst) 
{
                                             if (rvar == r || ecSnd != null && 
eqmap.get(rvar) == ecSnd) {
                                                 found = true;
                                                 break;
@@ -151,9 +156,9 @@
                                         }
                                         if (!found) {
                                             throw new 
IllegalStateException("Did not find a variable equivalent to "
-                                                    + r + " among " + 
keysRightBranch);
+                                                    + r + " among " + 
keysFirst);
                                         }
-                                        LogicalVariable v2 = 
keysLeftBranch.get(j);
+                                        LogicalVariable v2 = keysSecond.get(j);
                                         EquivalenceClass ecFst = eqmap.get(v2);
                                         for (LogicalVariable vset1 : set1) {
                                             if (vset1 == v2 || ecFst != null 
&& eqmap.get(vset1) == ecFst) {
diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index af40f67..ae9f4f1 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -132,11 +132,10 @@
                     case UNORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = 
(UnorderedPartitionedProperty) reqd;
                         UnorderedPartitionedProperty ud = 
(UnorderedPartitionedProperty) dlvd;
-                        if (mayExpandProperties) {
-                            return isPrefixOf(ud.getColumnSet().iterator(), 
ur.getColumnSet().iterator());
-                        } else {
-                            return ur.getColumnSet().equals(ud.getColumnSet());
-                        }
+                        if (mayExpandProperties)
+                            return (!ud.getColumnSet().isEmpty() && 
ur.getColumnSet().containsAll(ud.getColumnSet()));
+                        else
+                            return 
(ud.getColumnSet().equals(ur.getColumnSet()));
                     }
                     case ORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = 
(UnorderedPartitionedProperty) reqd;
diff --git 
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 4df9db7..2181efa 100644
--- 
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -27,7 +27,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -154,6 +153,43 @@
         return changed;
     }
 
+    // Gets the index of a child to start top-down data property enforcement.
+    // If there is a partitioning-compatible child with the operator in opRef, 
start from this child;
+    // otherwise, start from child zero.
+    private int getStartChildIndex(AbstractLogicalOperator op, 
PhysicalRequirements pr, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        IPhysicalPropertiesVector[] reqdProperties = null;
+        if (pr != null) {
+            reqdProperties = pr.getRequiredProperties();
+        }
+
+        List<IPartitioningProperty> 
deliveredPartitioningPropertiesFromChildren = new 
ArrayList<IPartitioningProperty>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) 
childRef.getValue();
+            
deliveredPartitioningPropertiesFromChildren.add(child.getDeliveredPhysicalProperties()
+                    .getPartitioningProperty());
+        }
+        int partitioningCompatibleChild = 0;
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            IPartitioningProperty deliveredPropertyFromChild = 
deliveredPartitioningPropertiesFromChildren.get(i);
+            if (reqdProperties == null
+                    || reqdProperties[i] == null
+                    || reqdProperties[i].getPartitioningProperty() == null
+                    || deliveredPropertyFromChild == null
+                    || 
reqdProperties[i].getPartitioningProperty().getPartitioningType() != 
deliveredPartitioningPropertiesFromChildren
+                            .get(i).getPartitioningType()) {
+                continue;
+            }
+            IPartitioningProperty requiredPropertyForChild = 
reqdProperties[i].getPartitioningProperty();
+            // If child i's delivered partitioning property already satisfies 
the required property, stop and return the child index.
+            if 
(PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, 
deliveredPropertyFromChild, true)) {
+                partitioningCompatibleChild = i;
+                break;
+            }
+        }
+        return partitioningCompatibleChild;
+    }
+
     private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, 
IPhysicalPropertiesVector required,
             boolean nestedPlan, IOptimizationContext context) throws 
AlgebricksException {
 
@@ -201,23 +237,31 @@
             }
         }
 
+        // The child index of the child operator to optimize first.
+        int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
         IPartitioningProperty firstDeliveredPartitioning = null;
-        int i = 0;
-        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-            AbstractLogicalOperator child = (AbstractLogicalOperator) 
childRef.getValue();
+        // Enforce data properties in a top-down manner.
+        for (int j = 0; j < op.getInputs().size(); j++) {
+            // Starts from a partitioning-compatible child if any to loop over 
all children.
+            int childIndex = (j + startChildIndex) % op.getInputs().size();
+            IPhysicalPropertiesVector requiredProperty = 
reqdProperties[childIndex];
+            AbstractLogicalOperator child = (AbstractLogicalOperator) 
op.getInputs().get(childIndex).getValue();
             IPhysicalPropertiesVector delivered = 
child.getDeliveredPhysicalProperties();
 
             AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties 
delivered by " + child.getPhysicalOperator()
                     + ": " + delivered + "\n");
             IPartitioningRequirementsCoordinator prc = 
pr.getPartitioningCoordinator();
+            // Coordinates requirements by looking at the 
firstDeliveredPartitioning.
             Pair<Boolean, IPartitioningProperty> pbpp = 
prc.coordinateRequirements(
-                    reqdProperties[i].getPartitioningProperty(), 
firstDeliveredPartitioning, op, context);
+                    requiredProperty.getPartitioningProperty(), 
firstDeliveredPartitioning, op, context);
             boolean mayExpandPartitioningProperties = pbpp.first;
             IPhysicalPropertiesVector rqd = new 
StructuralPropertiesVector(pbpp.second,
-                    reqdProperties[i].getLocalProperties());
+                    requiredProperty.getLocalProperties());
 
             AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required 
properties for " + child.getPhysicalOperator()
                     + ": " + rqd + "\n");
+            // The partitioning property of reqdProperties[childIndex] could 
be updated here because
+            // rqd.getPartitioningProperty() is the same object instance as 
requiredProperty.getPartitioningProperty().
             IPhysicalPropertiesVector diff = 
delivered.getUnsatisfiedPropertiesFrom(rqd,
                     mayExpandPartitioningProperties, 
context.getEquivalenceClassMap(child), context.getFDList(child));
 
@@ -227,9 +271,9 @@
 
             if (diff != null) {
                 changed = true;
-                addEnforcers(op, i, diff, rqd, delivered, childrenDomain, 
nestedPlan, context);
+                addEnforcers(op, childIndex, diff, rqd, delivered, 
childrenDomain, nestedPlan, context);
 
-                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) 
op.getInputs().get(i).getValue());
+                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) 
op.getInputs().get(childIndex).getValue());
 
                 if (newChild != child) {
                     delivered = newChild.getDeliveredPhysicalProperties();
@@ -242,8 +286,8 @@
                         break;
                     }
                 }
-
             }
+
             if (firstDeliveredPartitioning == null) {
                 IPartitioningProperty dpp = 
delivered.getPartitioningProperty();
                 if (dpp.getPartitioningType() == 
PartitioningType.ORDERED_PARTITIONED
@@ -251,8 +295,6 @@
                     firstDeliveredPartitioning = dpp;
                 }
             }
-
-            i++;
         }
 
         if (op.hasNestedPlans()) {
@@ -279,7 +321,6 @@
             // Now, transfer annotations from the original sort op. to this 
one.
             AbstractLogicalOperator transferTo = nextOp;
             if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
-                //
                 // remove duplicate exchange operator
                 transferTo = (AbstractLogicalOperator) 
transferTo.getInputs().get(0).getValue();
             }
@@ -598,7 +639,6 @@
             }
             return ordCols;
         }
-
     }
 
     private void setNewOp(Mutable<ILogicalOperator> opRef, 
AbstractLogicalOperator newOp, IOptimizationContext context)

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/395
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I835ea712c2f427149d45464fcb3841b8d33f6507
Gerrit-PatchSet: 6
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Wenhai Li <[email protected]>
Gerrit-Reviewer: Ian Maxon <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Preston Carman <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Wenhai Li <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>

Reply via email to