ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r701214497



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -254,25 +259,40 @@ case class EnsureRequirements(optimizeOutRepartition: 
Boolean = true) extends Ru
     }
   }
 
-  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
-        if optimizeOutRepartition &&
-          (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM) =>
-      def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean = {
-        partitioning match {
-          case lower: HashPartitioning if upper.semanticEquals(lower) => true
-          case lower: PartitioningCollection =>
-            lower.partitionings.exists(hasSemanticEqualPartitioning)
-          case _ => false
+  def apply(plan: SparkPlan): SparkPlan = {
+    val newPlan = plan.transformUp {
+      case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+          if optimizeOutRepartition &&
+            (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM) =>
+        def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean 
= {
+          partitioning match {
+            case lower: HashPartitioning if upper.semanticEquals(lower) => true
+            case lower: PartitioningCollection =>
+              lower.partitionings.exists(hasSemanticEqualPartitioning)
+            case _ => false
+          }
+        }
+        if (hasSemanticEqualPartitioning(child.outputPartitioning)) {
+          child
+        } else {
+          operator
         }
-      }
-      if (hasSemanticEqualPartitioning(child.outputPartitioning)) {
-        child
-      } else {
-        operator
-      }
 
-    case operator: SparkPlan =>
-      ensureDistributionAndOrdering(reorderJoinPredicates(operator))
+      case operator: SparkPlan =>
+        ensureDistributionAndOrdering(reorderJoinPredicates(operator))
+    }
+
+    requiredDistribution match {

Review comment:
       yeah, I considered about this approach, but not sure it's worth to do 
this integration. The requiredDistribution's shuffle origin is always different 
with the `ensureDistributionAndOrdering`, then if we want to merge them we need 
one more condition. Or if you have some other thought ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to