Repository: spark
Updated Branches:
  refs/heads/master 65fa4181c -> 3c66ff727


[SPARK-9489] Remove unnecessary compatibility and requirements checks from 
Exchange

While reviewing yhuai's patch for SPARK-2205 (#7773), I noticed that Exchange's 
`compatible` check may be incorrectly returning `false` in many cases.  As far 
as I know, this is not actually a problem because the `compatible`, 
`meetsRequirements`, and `needsAnySort` checks are serving only as 
short-circuit performance optimizations that are not necessary for correctness.

In order to reduce code complexity, I think that we should remove these checks 
and unconditionally rewrite the operator's children.  This should be safe 
because we rewrite the tree in a single bottom-up pass.

Author: Josh Rosen <joshro...@databricks.com>

Closes #7807 from JoshRosen/SPARK-9489 and squashes the following commits:

9d76ce9 [Josh Rosen] [SPARK-9489] Remove compatibleWith, meetsRequirements, and 
needsAnySort checks from Exchange


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c66ff72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c66ff72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c66ff72

Branch: refs/heads/master
Commit: 3c66ff727d4b47220e1ff363cea215189ed64f36
Parents: 65fa418
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Jul 30 17:38:48 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Jul 30 17:38:48 2015 -0700

----------------------------------------------------------------------
 .../catalyst/plans/physical/partitioning.scala  | 35 ---------
 .../apache/spark/sql/execution/Exchange.scala   | 76 +++++---------------
 2 files changed, 17 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c66ff72/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 2dcfa19..f4d1dba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -86,14 +86,6 @@ sealed trait Partitioning {
    */
   def satisfies(required: Distribution): Boolean
 
-  /**
-   * Returns true iff all distribution guarantees made by this partitioning 
can also be made
-   * for the `other` specified partitioning.
-   * For example, two [[HashPartitioning HashPartitioning]]s are
-   * only compatible if the `numPartitions` of them is the same.
-   */
-  def compatibleWith(other: Partitioning): Boolean
-
   /** Returns the expressions that are used to key the partitioning. */
   def keyExpressions: Seq[Expression]
 }
@@ -104,11 +96,6 @@ case class UnknownPartitioning(numPartitions: Int) extends 
Partitioning {
     case _ => false
   }
 
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-    case UnknownPartitioning(_) => true
-    case _ => false
-  }
-
   override def keyExpressions: Seq[Expression] = Nil
 }
 
@@ -117,11 +104,6 @@ case object SinglePartition extends Partitioning {
 
   override def satisfies(required: Distribution): Boolean = true
 
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-    case SinglePartition => true
-    case _ => false
-  }
-
   override def keyExpressions: Seq[Expression] = Nil
 }
 
@@ -130,11 +112,6 @@ case object BroadcastPartitioning extends Partitioning {
 
   override def satisfies(required: Distribution): Boolean = true
 
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-    case SinglePartition => true
-    case _ => false
-  }
-
   override def keyExpressions: Seq[Expression] = Nil
 }
 
@@ -159,12 +136,6 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
     case _ => false
   }
 
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-    case BroadcastPartitioning => true
-    case h: HashPartitioning if h == this => true
-    case _ => false
-  }
-
   override def keyExpressions: Seq[Expression] = expressions
 }
 
@@ -199,11 +170,5 @@ case class RangePartitioning(ordering: Seq[SortOrder], 
numPartitions: Int)
     case _ => false
   }
 
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-    case BroadcastPartitioning => true
-    case r: RangePartitioning if r == this => true
-    case _ => false
-  }
-
   override def keyExpressions: Seq[Expression] = ordering.map(_.child)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c66ff72/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 70e5031..6bd57f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -202,41 +202,6 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
     case operator: SparkPlan =>
-      // True iff every child's outputPartitioning satisfies the corresponding
-      // required data distribution.
-      def meetsRequirements: Boolean =
-        operator.requiredChildDistribution.zip(operator.children).forall {
-          case (required, child) =>
-            val valid = child.outputPartitioning.satisfies(required)
-            logDebug(
-              s"${if (valid) "Valid" else "Invalid"} distribution," +
-                s"required: $required current: ${child.outputPartitioning}")
-            valid
-        }
-
-      // True iff any of the children are incorrectly sorted.
-      def needsAnySort: Boolean =
-        operator.requiredChildOrdering.zip(operator.children).exists {
-          case (required, child) => required.nonEmpty && required != 
child.outputOrdering
-        }
-
-      // True iff outputPartitionings of children are compatible with each 
other.
-      // It is possible that every child satisfies its required data 
distribution
-      // but two children have incompatible outputPartitionings. For example,
-      // A dataset is range partitioned by "a.asc" (RangePartitioning) and 
another
-      // dataset is hash partitioned by "a" (HashPartitioning). Tuples in 
these two
-      // datasets are both clustered by "a", but these two outputPartitionings 
are not
-      // compatible.
-      // TODO: ASSUMES TRANSITIVITY?
-      def compatible: Boolean =
-        operator.children
-          .map(_.outputPartitioning)
-          .sliding(2)
-          .forall {
-            case Seq(a) => true
-            case Seq(a, b) => a.compatibleWith(b)
-          }
-
       // Adds Exchange or Sort operators as required
       def addOperatorsIfNecessary(
           partitioning: Partitioning,
@@ -269,33 +234,26 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
         addSortIfNecessary(addShuffleIfNecessary(child))
       }
 
-      if (meetsRequirements && compatible && !needsAnySort) {
-        operator
-      } else {
-        // At least one child does not satisfies its required data 
distribution or
-        // at least one child's outputPartitioning is not compatible with 
another child's
-        // outputPartitioning. In this case, we need to add Exchange operators.
-        val requirements =
-          (operator.requiredChildDistribution, operator.requiredChildOrdering, 
operator.children)
+      val requirements =
+        (operator.requiredChildDistribution, operator.requiredChildOrdering, 
operator.children)
 
-        val fixedChildren = requirements.zipped.map {
-          case (AllTuples, rowOrdering, child) =>
-            addOperatorsIfNecessary(SinglePartition, rowOrdering, child)
-          case (ClusteredDistribution(clustering), rowOrdering, child) =>
-            addOperatorsIfNecessary(HashPartitioning(clustering, 
numPartitions), rowOrdering, child)
-          case (OrderedDistribution(ordering), rowOrdering, child) =>
-            addOperatorsIfNecessary(RangePartitioning(ordering, 
numPartitions), rowOrdering, child)
+      val fixedChildren = requirements.zipped.map {
+        case (AllTuples, rowOrdering, child) =>
+          addOperatorsIfNecessary(SinglePartition, rowOrdering, child)
+        case (ClusteredDistribution(clustering), rowOrdering, child) =>
+          addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), 
rowOrdering, child)
+        case (OrderedDistribution(ordering), rowOrdering, child) =>
+          addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), 
rowOrdering, child)
 
-          case (UnspecifiedDistribution, Seq(), child) =>
-            child
-          case (UnspecifiedDistribution, rowOrdering, child) =>
-            sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, 
global = false, child)
+        case (UnspecifiedDistribution, Seq(), child) =>
+          child
+        case (UnspecifiedDistribution, rowOrdering, child) =>
+          sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, 
global = false, child)
 
-          case (dist, ordering, _) =>
-            sys.error(s"Don't know how to ensure $dist with ordering 
$ordering")
-        }
-
-        operator.withNewChildren(fixedChildren)
+        case (dist, ordering, _) =>
+          sys.error(s"Don't know how to ensure $dist with ordering $ordering")
       }
+
+      operator.withNewChildren(fixedChildren)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to