[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r706592608 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala ## @@ -35,15 +36,48 @@ case class SimpleCost(value: Long) extends Cost { } /** - * A simple implementation of [[CostEvaluator]], which counts the number of - * [[ShuffleExchangeLike]] nodes in the plan. + * A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number Review comment: yea, add more comment to show how we pick the cost with skew join and shuffle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r704911852 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ## @@ -38,26 +38,40 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin * but can be false in AQE when AQE optimization may change the plan * output partitioning and need to retain the user-specified * repartition shuffles in the plan. + * @param requiredDistribution The root required distribution we should ensure. This value is used + * in AQE in case we change final stage output partitioning. */ -case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Rule[SparkPlan] { - - private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { -val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution -val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering -var children: Seq[SparkPlan] = operator.children -assert(requiredChildDistributions.length == children.length) -assert(requiredChildOrderings.length == children.length) +case class EnsureRequirements( +optimizeOutRepartition: Boolean = true, +requiredDistribution: Option[Distribution] = None) + extends Rule[SparkPlan] { + private def ensureDistributionAndOrdering( + originChildren: Seq[SparkPlan], + requiredChildDistributions: Seq[Distribution], + requiredChildOrderings: Seq[Seq[SortOrder]], + isRootDistribution: Boolean): Seq[SparkPlan] = { +assert(requiredChildDistributions.length == originChildren.length) +assert(requiredChildOrderings.length == originChildren.length) // Ensure that the operator's children satisfy their output distribution requirements. -children = children.zip(requiredChildDistributions).map { +var children = originChildren.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) -ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) +val shuffleOrigin = if (isRootDistribution) { Review comment: agree, pulled out the shuffle origin to make thie more cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r703370345 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -108,16 +114,17 @@ case class AdaptiveSparkPlanExec( // around this case. EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), Review comment: combined, ensure we run the same `EnsureRequirements` twice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r703280068 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala ## @@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { // SHJ // Shuffle // Shuffle - optimizeSkewJoin(plan) + val optimized = +EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) Review comment: seems better, `OptimizeSkewedJoin` doesn't need to care `requiredDistribution` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r703278537 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala ## @@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { // SHJ // Shuffle // Shuffle - optimizeSkewJoin(plan) + val optimized = +EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) + .apply(optimizeSkewJoin(plan)) + val originCost = costEvaluator.evaluateCost(plan) + val optimizedCost = costEvaluator.evaluateCost(optimized) + // two cases we will pick new plan: + // 1. optimize the skew join without extra shuffle + // 2. optimize the skew join with extra shuffle but the costEvaluator think it's better + if (optimizedCost < originCost || (originCost == optimizedCost && optimized != plan)) { Review comment: yea, since we are only optimize physical plan here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r702426889 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala ## @@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { // SHJ // Shuffle // Shuffle - optimizeSkewJoin(plan) + val optimized = +EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) + .apply(optimizeSkewJoin(plan)) + val originCost = costEvaluator.evaluateCost(plan) + val optimizedCost = costEvaluator.evaluateCost(optimized) + // two cases we will pick new plan: + // 1. optimize the skew join without extra shuffle + // 2. optimize the skew join with extra shuffle but the costEvaluator think it's better + if (optimizedCost < originCost || (originCost == optimizedCost && optimized != plan)) { Review comment: Moved the cost evaluation into `OptimizeSkewJoin`, so now we have two cases that will pick the new plan (see the comment). ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -169,12 +174,6 @@ case class AdaptiveSparkPlanExec( optimized } - @transient private val costEvaluator = -conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { - case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) - case _ => SimpleCostEvaluator -} Review comment: just move this code up ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ## @@ -38,26 +38,40 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin * but can be false in AQE when AQE optimization may change the plan * output partitioning and need to retain the user-specified * repartition shuffles in the plan. + * @param requiredDistribution The root required distribution we should ensure. This value is used + * in AQE in case we change final stage output partitioning. */ -case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Rule[SparkPlan] { - - private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { -val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution -val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering -var children: Seq[SparkPlan] = operator.children -assert(requiredChildDistributions.length == children.length) -assert(requiredChildOrderings.length == children.length) +case class EnsureRequirements( +optimizeOutRepartition: Boolean = true, +requiredDistribution: Option[Distribution] = None) + extends Rule[SparkPlan] { + private def ensureDistributionAndOrdering( + originChildren: Seq[SparkPlan], + requiredChildDistributions: Seq[Distribution], + requiredChildOrderings: Seq[Seq[SortOrder]], + isRootDistribution: Boolean): Seq[SparkPlan] = { +assert(requiredChildDistributions.length == originChildren.length) +assert(requiredChildOrderings.length == originChildren.length) // Ensure that the operator's children satisfy their output distribution requirements. -children = children.zip(requiredChildDistributions).map { +var children = originChildren.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(conf.numShufflePartitions) -ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) +val shuffleOrigin = if (isRootDistribution) { Review comment: @maryannxue this condition is what I said before. if the required distribution is root then we should not give it the `ENSURE_REQUIREMENTS`. The reason is `ENSURE_REQUIREMENTS` will be optimized by other rules (e.g. `CoalesceShufflePartitions`) that will break the required distribution again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r701542919 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -640,14 +668,12 @@ case class AdaptiveSparkPlanExec( * Re-optimize and run physical planning on the current logical plan based on the latest stats. */ private def reOptimize( - logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = context.qe.withCteMap { + logicalPlan: LogicalPlan): (Seq[SparkPlan], LogicalPlan) = context.qe.withCteMap { logicalPlan.invalidateStatsCache() val optimized = optimizer.execute(logicalPlan) val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next() -val newPlan = applyPhysicalRules( - sparkPlan, - preprocessingRules ++ queryStagePreparationRules, - Some((planChangeLogger, "AQE Replanning"))) +val optimizedPhysicalPlan = prepareQueryStages(sparkPlan, false) Review comment: @maryannxue Do you mean the logic if we should apply `OptimizeSkewedJoin` + `EnsureRequirements` or not moves into `OptimizeSkewedJoin` itself ? If so, we aslo need add an extra cost evaluator check in `OptimizeSkewedJoin`. Seems it' not clearer than current approach that we only evaluate the cost in one place. Am I following your thought ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r701542919 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -640,14 +668,12 @@ case class AdaptiveSparkPlanExec( * Re-optimize and run physical planning on the current logical plan based on the latest stats. */ private def reOptimize( - logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = context.qe.withCteMap { + logicalPlan: LogicalPlan): (Seq[SparkPlan], LogicalPlan) = context.qe.withCteMap { logicalPlan.invalidateStatsCache() val optimized = optimizer.execute(logicalPlan) val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next() -val newPlan = applyPhysicalRules( - sparkPlan, - preprocessingRules ++ queryStagePreparationRules, - Some((planChangeLogger, "AQE Replanning"))) +val optimizedPhysicalPlan = prepareQueryStages(sparkPlan, false) Review comment: @maryannxue Do you mean the logic if we should apply `OptimizeSkewedJoin` + `EnsureRequirements` or not moves into `OptimizeSkewedJoin` itself ? If so, we aslo need add a nextra cost evaluator check in `OptimizeSkewedJoin`. Seems it' not cleaner than current approach that we only evaluate the cost in one place. Am I following your thought ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r701214497 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ## @@ -254,25 +259,40 @@ case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends Ru } } - def apply(plan: SparkPlan): SparkPlan = plan.transformUp { -case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, shuffleOrigin) -if optimizeOutRepartition && - (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == REPARTITION_BY_NUM) => - def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean = { -partitioning match { - case lower: HashPartitioning if upper.semanticEquals(lower) => true - case lower: PartitioningCollection => -lower.partitionings.exists(hasSemanticEqualPartitioning) - case _ => false + def apply(plan: SparkPlan): SparkPlan = { +val newPlan = plan.transformUp { + case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, shuffleOrigin) + if optimizeOutRepartition && +(shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == REPARTITION_BY_NUM) => +def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean = { + partitioning match { +case lower: HashPartitioning if upper.semanticEquals(lower) => true +case lower: PartitioningCollection => + lower.partitionings.exists(hasSemanticEqualPartitioning) +case _ => false + } +} +if (hasSemanticEqualPartitioning(child.outputPartitioning)) { + child +} else { + operator } - } - if (hasSemanticEqualPartitioning(child.outputPartitioning)) { -child - } else { -operator - } -case operator: SparkPlan => - ensureDistributionAndOrdering(reorderJoinPredicates(operator)) + case operator: SparkPlan => +ensureDistributionAndOrdering(reorderJoinPredicates(operator)) +} + +requiredDistribution match { Review comment: yeah, I considered about this approach, but not sure it's worth to do this integration. The requiredDistribution's shuffle origin is always different with the `ensureDistributionAndOrdering`, then if we want to merge them we need one more condition. Or if you have some other thought ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r700751867 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala ## @@ -57,4 +58,22 @@ object AQEUtils { } case _ => Some(UnspecifiedDistribution) } + + // Add an extra shuffle if input plan does not satisfy the required distribution. + // This method is invoked after optimizing skewed join in case we change final stage + // output partitioning. + def ensureRequiredDistribution( Review comment: good point, merged this code into `EnsureRequirements` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r696418503 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -1908,6 +1914,70 @@ class AdaptiveQueryExecSuite } } + test("SPARK-33832: Support optimize skew join even if introduce extra shuffle") { +withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "10", + SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN.key -> "true") { + withTempView("skewData1", "skewData2") { +spark + .range(0, 1000, 1, 10) + .selectExpr("id % 3 as key1", "id as value1") + .createOrReplaceTempView("skewData1") +spark + .range(0, 1000, 1, 10) + .selectExpr("id % 1 as key2", "id as value2") + .createOrReplaceTempView("skewData2") + +// check if optimized skewed join does not satisfy the required distribution +Seq(true, false).foreach { hasRequiredDistribution => + Seq(true, false).foreach { hasPartitionNumber => +val repartition = if (hasRequiredDistribution) { + s"/*+ repartition(${ if (hasPartitionNumber) "10," else ""}key1) */" +} else { + "" +} + +// check required distribution and extra shuffle +val (_, adaptive1) = + runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + +s"JOIN skewData2 ON key1 = key2 GROUP BY key1") +val shuffles1 = findTopLevelShuffle(adaptive1) Review comment: inlined this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r696415767 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -169,15 +179,34 @@ case class AdaptiveSparkPlanExec( optimized } + def prepareQueryStages( + plan: SparkPlan, + optimizeSkewedJoin: Boolean): SparkPlan = { +if (optimizeSkewedJoin) { + AQEUtils.ensureRequiredDistribution( Review comment: yeah we can always run it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r696411043 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec( // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. - @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( -RemoveRedundantProjects, -// For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for -// the final plan, but we do need to respect the user-specified repartition. Here we ask -// `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work -// around this case. -EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), -RemoveRedundantSorts, -DisableUnnecessaryBucketedScan - ) ++ context.session.sessionState.queryStagePrepRules + private def queryStagePreparationRules( + optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = { Review comment: Im OK to remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r696410142 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec( // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. - @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( -RemoveRedundantProjects, -// For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for -// the final plan, but we do need to respect the user-specified repartition. Here we ask -// `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work -// around this case. -EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), -RemoveRedundantSorts, -DisableUnnecessaryBucketedScan - ) ++ context.session.sessionState.queryStagePrepRules + private def queryStagePreparationRules( + optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = { Review comment: this is mainly for `initalPlan` which is not needed optimize skewed join -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r696399833 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala ## @@ -258,7 +258,8 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule { object ShuffleStage { def unapply(plan: SparkPlan): Option[ShuffleQueryStageExec] = plan match { - case s: ShuffleQueryStageExec if s.mapStats.isDefined && isSupported(s.shuffle) => + case s: ShuffleQueryStageExec if s.isMaterialized && s.mapStats.isDefined && +isSupported(s.shuffle) => Review comment: removed the `AQEShuffleReadRule` and simplify the condition inline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r696398887 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -1908,6 +1914,69 @@ class AdaptiveQueryExecSuite } } + test("SPARK-33832: Support optimize skew join even if introduce extra shuffle") { +withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "10", + SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN.key -> "true") { + withTempView("skewData1", "skewData2") { +spark + .range(0, 1000, 1, 10) + .selectExpr("id % 3 as key1", "id as value1") + .createOrReplaceTempView("skewData1") +spark + .range(0, 1000, 1, 10) + .selectExpr("id % 1 as key2", "id as value2") + .createOrReplaceTempView("skewData2") + +// check if optimized skewed join does not satisfy the required distribution +Seq(true, false).foreach { hasRequiredDistribution => + Seq(true, false).foreach { hasPartitionNumber => +val repartition = if (hasRequiredDistribution) { + s"/*+ repartition(${ if (hasPartitionNumber) "10," else ""}key1) */" +} else { + "" +} + +// check required distribution and extra shuffle +val (_, adaptive1) = + runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + +s"JOIN skewData2 ON key1 = key2 GROUP BY key1") +val shuffles1 = findTopLevelShuffle(adaptive1) +assert(shuffles1.size == 3) +assert(shuffles1.head.shuffleOrigin == ENSURE_REQUIREMENTS) Review comment: updated the comment, the head shuffle is from second EnsureRequirements in queryStagePreparationRules. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r695402449 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala ## @@ -57,4 +58,20 @@ object AQEUtils { } case _ => Some(UnspecifiedDistribution) } + + // Add an extra shuffle if input plan does not satisfy the required distribution. + // This method is invoked after optimizing skewed join in case we change final stage + // output partitioning. + def ensureRequiredDistribution( + plan: SparkPlan, distribution: Option[Distribution]): SparkPlan = distribution match { +case Some(d) if !plan.outputPartitioning.satisfies(d) => + val numPartitions = d.requiredNumPartitions.getOrElse(conf.numShufflePartitions) + val shuffleOrigin = if (d.requiredNumPartitions.isDefined) { +REPARTITION_BY_NUM + } else { +REPARTITION_BY_COL + } + ShuffleExchangeExec(d.createPartitioning(numPartitions), plan, shuffleOrigin) Review comment: We should retain origin shuffle here, otherwise some stage optimize rules can break the `ENSIRE_REQUIREMENT` output partitioning in next stage. e.g. `CoalesceShufflePartitions` when the input plan's origin is `REPARTITION_BY_NUM` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r692656713 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec( // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. - @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( -RemoveRedundantProjects, -// For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for -// the final plan, but we do need to respect the user-specified repartition. Here we ask -// `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work -// around this case. -EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), -RemoveRedundantSorts, -DisableUnnecessaryBucketedScan - ) ++ context.session.sessionState.queryStagePrepRules + private def queryStagePreparationRules( + optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = { +val optimizeSkewedJoinRules = if (optimizeSkewedJoin) { + Seq(OptimizeSkewedJoin, +// Add the EnsureRequirements rule here since OptimizeSkewedJoin will change +// output partitioning, make sure we have right distribution. +EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined)) +} else { + Nil +} + +Seq( + RemoveRedundantProjects, + // For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for + // the final plan, but we do need to respect the user-specified repartition. Here we ask + // `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work + // around this case. + EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined), + RemoveRedundantSorts, + DisableUnnecessaryBucketedScan +) ++ optimizeSkewedJoinRules ++ context.session.sessionState.queryStagePrepRules Review comment: We need two `EnsureRequirements` for `OptimizeSkewedJoin`. The first `EnsureRequirements` is to add enough exchange for Join that ensure the physical plan can be executed. And the second `EnsureRequirements` is to make sure we have right distribution after `OptimizeSkewedJoin ` applied since `OptimizeSkewedJoin` can change the output partitioning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r692653369 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -656,13 +687,54 @@ case class AdaptiveSparkPlanExec( // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery. // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan // is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule. -val finalPlan = currentPhysicalPlan match { +def updateBroadcastExchange(plan: SparkPlan): SparkPlan = currentPhysicalPlan match { case b: BroadcastExchangeLike -if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan)) - case _ => newPlan +if (!plan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(plan)) + case _ => plan } -(finalPlan, optimized) +val optimizedWithSkewedJoin = applyPhysicalRules( + optimizedPhysicalPlan, + optimizeSkewedJoinWithExtraShuffleRules, + Some((planChangeLogger, "AQE Optimize Skewed Join With Extra Shuffle")) +) +val validatedWithSkewedJoin = + checkDistribution( +optimizedWithSkewedJoin, +optimizedPhysicalPlan, +isFinalStage(optimizedWithSkewedJoin), Review comment: Since `OptimizeSkewedJoin` moved to preparation phase, we need to validate if it change the output partitioning in final stage. Otherwise, the we may not satisfy `requiredDistribution` if `OptimizeSkewedJoin` applied. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r692092985 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewJoinAwareCost.scala ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike +import org.apache.spark.sql.execution.joins.ShuffledJoin + +/** + * A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number + */ +case class SkewJoinAwareCost( +numShuffles: Int, +numSkewJoins: Int) extends Cost { + override def compare(that: Cost): Int = that match { +case other: SkewJoinAwareCost => + if (numSkewJoins > other.numSkewJoins || numShuffles < other.numShuffles) { Review comment: Good point I think we should always pick the bigger skew join number first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r692075920 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewJoinAwareCost.scala ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike +import org.apache.spark.sql.execution.joins.ShuffledJoin + +/** + * A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number + */ +case class SkewJoinAwareCost( +numShuffles: Int, +numSkewJoins: Int) extends Cost { + override def compare(that: Cost): Int = that match { +case other: SkewJoinAwareCost => + if (numSkewJoins > other.numSkewJoins || numShuffles < other.numShuffles) { Review comment: yes, if a plan has now skew join and less shuffle nodes than b plan, we will pick a plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r692077822 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -656,13 +687,54 @@ case class AdaptiveSparkPlanExec( // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery. // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan // is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule. -val finalPlan = currentPhysicalPlan match { +def updateBroadcastExchange(plan: SparkPlan): SparkPlan = currentPhysicalPlan match { case b: BroadcastExchangeLike -if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan)) - case _ => newPlan +if (!plan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(plan)) + case _ => plan } -(finalPlan, optimized) +val optimizedWithSkewedJoin = applyPhysicalRules( + optimizedPhysicalPlan, + optimizeSkewedJoinWithExtraShuffleRules, + Some((planChangeLogger, "AQE Optimize Skewed Join With Extra Shuffle")) +) +val validatedWithSkewedJoin = + checkDistribution( +optimizedWithSkewedJoin, +optimizedPhysicalPlan, +isFinalStage(optimizedWithSkewedJoin), +OptimizeSkewedJoin.ruleName) + +// here are three reasons if validatedWithSkewedJoin is equal to optimizedPhysicalPlan: +// 1. no skewed join optimized +// 2. optimize skewed join introduce extra shuffle and force optimize is disabled +// 3. optimize skewed join change final stage output partitioning +val newPhysicalPlans = if (validatedWithSkewedJoin.fastEquals(optimizedPhysicalPlan)) { + updateBroadcastExchange(optimizedPhysicalPlan) :: Nil Review comment: @cloud-fan in these three case, we will not introduce extra overhead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r692075920 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewJoinAwareCost.scala ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike +import org.apache.spark.sql.execution.joins.ShuffledJoin + +/** + * A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number + */ +case class SkewJoinAwareCost( +numShuffles: Int, +numSkewJoins: Int) extends Cost { + override def compare(that: Cost): Int = that match { +case other: SkewJoinAwareCost => + if (numSkewJoins > other.numSkewJoins || numShuffles < other.numShuffles) { Review comment: yes, if a plan has now skew join and less shuffle nodes than b plan, we will pick a plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r688276500 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -169,10 +162,33 @@ case class AdaptiveSparkPlanExec( optimized } + private def checkDistribution( + newPlan: SparkPlan, + originPlan: SparkPlan, + isFinalStage: Boolean, + ruleName: String): SparkPlan = { Review comment: this method is used by `optimizeQueryStage` and `optimizeSkewedJoin` for checking final stage distribution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r662725098 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -252,17 +275,26 @@ case class AdaptiveSparkPlanExec( // plans are updated, we can clear the query stage list because at this point the two plans // are semantically and physically in sync again. val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace) -val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan) +val (reOptimizePhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan) +val planWithExtraShuffle = rePlanWithExtraShuffle(reOptimizePhysicalPlan) val origCost = costEvaluator.evaluateCost(currentPhysicalPlan) -val newCost = costEvaluator.evaluateCost(newPhysicalPlan) -if (newCost < origCost || -(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { +val newCost = costEvaluator.evaluateCost(reOptimizePhysicalPlan) +val extraShuffleCost = costEvaluator.evaluateCost(planWithExtraShuffle) +def updateCurrentPlan(newPhysicalPlan: SparkPlan): Unit = { logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan stagesToReplace = Seq.empty[QueryStageExec] } + +if (extraShuffleCost < newCost || + (extraShuffleCost == newCost && planWithExtraShuffle != reOptimizePhysicalPlan)) { + updateCurrentPlan(planWithExtraShuffle) +} else if (newCost < origCost || + (newCost == origCost && currentPhysicalPlan != reOptimizePhysicalPlan)) { + updateCurrentPlan(reOptimizePhysicalPlan) +} Review comment: @cloud-fan here use 3 costs to find the better plan 1. plan with skew join if force optimize skew join 2. plan with reOptimize if not force optimize skew join and has no extra shuffle 3. origin plan if reOptimize has extra shuffle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle
ulysses-you commented on a change in pull request #32816: URL: https://github.com/apache/spark/pull/32816#discussion_r647489569 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -111,6 +115,19 @@ case class AdaptiveSparkPlanExec( CollapseCodegenStages() ) + // OptimizeSkewedJoin has moved into preparation rules, so we should make + // finalPreparationStageRules same as finalStageOptimizerRules + private def finalPreparationStageRules: Seq[Rule[SparkPlan]] = { Review comment: yea, currently it's only for OptimizeSkewedJoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org