ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r702426889
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala ########## @@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { // SHJ // Shuffle // Shuffle - optimizeSkewJoin(plan) + val optimized = + EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) + .apply(optimizeSkewJoin(plan)) + val originCost = costEvaluator.evaluateCost(plan) + val optimizedCost = costEvaluator.evaluateCost(optimized) + // two cases we will pick new plan: + // 1. optimize the skew join without extra shuffle + // 2. optimize the skew join with extra shuffle but the costEvaluator think it's better + if (optimizedCost < originCost || (originCost == optimizedCost && optimized != plan)) { Review comment: Moved the cost evaluation into `OptimizeSkewJoin`, so now we have two cases that will pick the new plan (see the comment). ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -169,12 +174,6 @@ case class AdaptiveSparkPlanExec( optimized } - @transient private val costEvaluator = - conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { - case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) - case _ => SimpleCostEvaluator - } Review comment: just move this code up ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ########## @@ -38,26 +38,40 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin * but can be false in AQE when AQE optimization may change the plan * output partitioning and need to retain the user-specified * repartition shuffles in the plan. + * @param requiredDistribution The root required distribution we should ensure. This value is used + * in AQE in case we change final stage output partitioning. */ -case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Rule[SparkPlan] { - - private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution - val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering - var children: Seq[SparkPlan] = operator.children - assert(requiredChildDistributions.length == children.length) - assert(requiredChildOrderings.length == children.length) +case class EnsureRequirements( + optimizeOutRepartition: Boolean = true, + requiredDistribution: Option[Distribution] = None) + extends Rule[SparkPlan] { + private def ensureDistributionAndOrdering( + originChildren: Seq[SparkPlan], + requiredChildDistributions: Seq[Distribution], + requiredChildOrderings: Seq[Seq[SortOrder]], + isRootDistribution: Boolean): Seq[SparkPlan] = { + assert(requiredChildDistributions.length == originChildren.length) + assert(requiredChildOrderings.length == originChildren.length) // Ensure that the operator's children satisfy their output distribution requirements. - children = children.zip(requiredChildDistributions).map { + var children = originChildren.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) - ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) + val shuffleOrigin = if (isRootDistribution) { Review comment: @maryannxue this condition is what I said before. if the required distribution is root then we should not give it the `ENSURE_REQUIREMENTS`. The reason is `ENSURE_REQUIREMENTS` will be optimized by other rules (e.g. `CoalesceShufflePartitions`) that will break the required distribution again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org