[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-11 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r706592608



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala
##
@@ -35,15 +36,48 @@ case class SimpleCost(value: Long) extends Cost {
 }
 
 /**
- * A simple implementation of [[CostEvaluator]], which counts the number of
- * [[ShuffleExchangeLike]] nodes in the plan.
+ * A skew join aware implementation of [[Cost]], which consider shuffle number 
and skew join number

Review comment:
   yea, add more comment to show how we pick the cost with skew join and 
shuffle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-08 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r704911852



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -38,26 +38,40 @@ import 
org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin
  *   but can be false in AQE when AQE optimization 
may change the plan
  *   output partitioning and need to retain the 
user-specified
  *   repartition shuffles in the plan.
+ * @param requiredDistribution The root required distribution we should 
ensure. This value is used
+ * in AQE in case we change final stage output 
partitioning.
  */
-case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends 
Rule[SparkPlan] {
-
-  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
-val requiredChildDistributions: Seq[Distribution] = 
operator.requiredChildDistribution
-val requiredChildOrderings: Seq[Seq[SortOrder]] = 
operator.requiredChildOrdering
-var children: Seq[SparkPlan] = operator.children
-assert(requiredChildDistributions.length == children.length)
-assert(requiredChildOrderings.length == children.length)
+case class EnsureRequirements(
+optimizeOutRepartition: Boolean = true,
+requiredDistribution: Option[Distribution] = None)
+  extends Rule[SparkPlan] {
 
+  private def ensureDistributionAndOrdering(
+  originChildren: Seq[SparkPlan],
+  requiredChildDistributions: Seq[Distribution],
+  requiredChildOrderings: Seq[Seq[SortOrder]],
+  isRootDistribution: Boolean): Seq[SparkPlan] = {
+assert(requiredChildDistributions.length == originChildren.length)
+assert(requiredChildOrderings.length == originChildren.length)
 // Ensure that the operator's children satisfy their output distribution 
requirements.
-children = children.zip(requiredChildDistributions).map {
+var children = originChildren.zip(requiredChildDistributions).map {
   case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
 child
   case (child, BroadcastDistribution(mode)) =>
 BroadcastExchangeExec(mode, child)
   case (child, distribution) =>
 val numPartitions = distribution.requiredNumPartitions
   .getOrElse(conf.numShufflePartitions)
-ShuffleExchangeExec(distribution.createPartitioning(numPartitions), 
child)
+val shuffleOrigin = if (isRootDistribution) {

Review comment:
   agree, pulled out the shuffle origin to make thie more cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-07 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r703370345



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -108,16 +114,17 @@ case class AdaptiveSparkPlanExec(
 // around this case.
 EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),

Review comment:
   combined, ensure we run the same `EnsureRequirements` twice




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-07 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r703280068



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##
@@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule {
   // SHJ
   //   Shuffle
   //   Shuffle
-  optimizeSkewJoin(plan)
+  val optimized =
+EnsureRequirements(requiredDistribution.isDefined, 
requiredDistribution)

Review comment:
   seems better, `OptimizeSkewedJoin` doesn't need to care 
`requiredDistribution`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-07 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r703278537



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##
@@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule {
   // SHJ
   //   Shuffle
   //   Shuffle
-  optimizeSkewJoin(plan)
+  val optimized =
+EnsureRequirements(requiredDistribution.isDefined, 
requiredDistribution)
+  .apply(optimizeSkewJoin(plan))
+  val originCost = costEvaluator.evaluateCost(plan)
+  val optimizedCost = costEvaluator.evaluateCost(optimized)
+  // two cases we will pick new plan:
+  //   1. optimize the skew join without extra shuffle
+  //   2. optimize the skew join with extra shuffle but the costEvaluator 
think it's better
+  if (optimizedCost < originCost || (originCost == optimizedCost && 
optimized != plan)) {

Review comment:
   yea, since we are only optimize physical plan here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-05 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r702426889



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##
@@ -250,15 +253,28 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule {
   // SHJ
   //   Shuffle
   //   Shuffle
-  optimizeSkewJoin(plan)
+  val optimized =
+EnsureRequirements(requiredDistribution.isDefined, 
requiredDistribution)
+  .apply(optimizeSkewJoin(plan))
+  val originCost = costEvaluator.evaluateCost(plan)
+  val optimizedCost = costEvaluator.evaluateCost(optimized)
+  // two cases we will pick new plan:
+  //   1. optimize the skew join without extra shuffle
+  //   2. optimize the skew join with extra shuffle but the costEvaluator 
think it's better
+  if (optimizedCost < originCost || (originCost == optimizedCost && 
optimized != plan)) {

Review comment:
   Moved the cost evaluation into `OptimizeSkewJoin`, so now we have two 
cases that will pick the new plan (see the comment).

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -169,12 +174,6 @@ case class AdaptiveSparkPlanExec(
 optimized
   }
 
-  @transient private val costEvaluator =
-conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match {
-  case Some(className) => CostEvaluator.instantiate(className, 
session.sparkContext.getConf)
-  case _ => SimpleCostEvaluator
-}

Review comment:
   just move this code up

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -38,26 +38,40 @@ import 
org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin
  *   but can be false in AQE when AQE optimization 
may change the plan
  *   output partitioning and need to retain the 
user-specified
  *   repartition shuffles in the plan.
+ * @param requiredDistribution The root required distribution we should 
ensure. This value is used
+ * in AQE in case we change final stage output 
partitioning.
  */
-case class EnsureRequirements(optimizeOutRepartition: Boolean = true) extends 
Rule[SparkPlan] {
-
-  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
-val requiredChildDistributions: Seq[Distribution] = 
operator.requiredChildDistribution
-val requiredChildOrderings: Seq[Seq[SortOrder]] = 
operator.requiredChildOrdering
-var children: Seq[SparkPlan] = operator.children
-assert(requiredChildDistributions.length == children.length)
-assert(requiredChildOrderings.length == children.length)
+case class EnsureRequirements(
+optimizeOutRepartition: Boolean = true,
+requiredDistribution: Option[Distribution] = None)
+  extends Rule[SparkPlan] {
 
+  private def ensureDistributionAndOrdering(
+  originChildren: Seq[SparkPlan],
+  requiredChildDistributions: Seq[Distribution],
+  requiredChildOrderings: Seq[Seq[SortOrder]],
+  isRootDistribution: Boolean): Seq[SparkPlan] = {
+assert(requiredChildDistributions.length == originChildren.length)
+assert(requiredChildOrderings.length == originChildren.length)
 // Ensure that the operator's children satisfy their output distribution 
requirements.
-children = children.zip(requiredChildDistributions).map {
+var children = originChildren.zip(requiredChildDistributions).map {
   case (child, distribution) if 
child.outputPartitioning.satisfies(distribution) =>
 child
   case (child, BroadcastDistribution(mode)) =>
 BroadcastExchangeExec(mode, child)
   case (child, distribution) =>
 val numPartitions = distribution.requiredNumPartitions
   .getOrElse(conf.numShufflePartitions)
-ShuffleExchangeExec(distribution.createPartitioning(numPartitions), 
child)
+val shuffleOrigin = if (isRootDistribution) {

Review comment:
   @maryannxue this condition is what I said before. if the required 
distribution is root then we should not give it the `ENSURE_REQUIREMENTS`. The 
reason is `ENSURE_REQUIREMENTS` will be optimized by other rules (e.g. 
`CoalesceShufflePartitions`) that will break the  required distribution again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-02 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r701542919



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -640,14 +668,12 @@ case class AdaptiveSparkPlanExec(
* Re-optimize and run physical planning on the current logical plan based 
on the latest stats.
*/
   private def reOptimize(
-  logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = 
context.qe.withCteMap {
+  logicalPlan: LogicalPlan): (Seq[SparkPlan], LogicalPlan) = 
context.qe.withCteMap {
 logicalPlan.invalidateStatsCache()
 val optimized = optimizer.execute(logicalPlan)
 val sparkPlan = 
context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
-val newPlan = applyPhysicalRules(
-  sparkPlan,
-  preprocessingRules ++ queryStagePreparationRules,
-  Some((planChangeLogger, "AQE Replanning")))
+val optimizedPhysicalPlan = prepareQueryStages(sparkPlan, false)

Review comment:
   @maryannxue  Do you mean the logic if we should apply 
`OptimizeSkewedJoin` + `EnsureRequirements` or not moves into 
`OptimizeSkewedJoin` itself ? If so, we aslo need add an extra cost evaluator 
check in `OptimizeSkewedJoin`. Seems it' not clearer than current approach that 
we only evaluate the cost in one place.
   
   Am I following your thought ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-02 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r701542919



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -640,14 +668,12 @@ case class AdaptiveSparkPlanExec(
* Re-optimize and run physical planning on the current logical plan based 
on the latest stats.
*/
   private def reOptimize(
-  logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = 
context.qe.withCteMap {
+  logicalPlan: LogicalPlan): (Seq[SparkPlan], LogicalPlan) = 
context.qe.withCteMap {
 logicalPlan.invalidateStatsCache()
 val optimized = optimizer.execute(logicalPlan)
 val sparkPlan = 
context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
-val newPlan = applyPhysicalRules(
-  sparkPlan,
-  preprocessingRules ++ queryStagePreparationRules,
-  Some((planChangeLogger, "AQE Replanning")))
+val optimizedPhysicalPlan = prepareQueryStages(sparkPlan, false)

Review comment:
   @maryannxue  Do you mean the logic if we should apply 
`OptimizeSkewedJoin` + `EnsureRequirements` or not moves into 
`OptimizeSkewedJoin` itself ? If so, we aslo need add a nextra cost evaluator 
check in `OptimizeSkewedJoin`. Seems it' not cleaner than current approach that 
we only evaluate the cost in one place.
   
   Am I following your thought ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-02 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r701214497



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -254,25 +259,40 @@ case class EnsureRequirements(optimizeOutRepartition: 
Boolean = true) extends Ru
 }
   }
 
-  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
-if optimizeOutRepartition &&
-  (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM) =>
-  def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean = {
-partitioning match {
-  case lower: HashPartitioning if upper.semanticEquals(lower) => true
-  case lower: PartitioningCollection =>
-lower.partitionings.exists(hasSemanticEqualPartitioning)
-  case _ => false
+  def apply(plan: SparkPlan): SparkPlan = {
+val newPlan = plan.transformUp {
+  case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+  if optimizeOutRepartition &&
+(shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM) =>
+def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean 
= {
+  partitioning match {
+case lower: HashPartitioning if upper.semanticEquals(lower) => true
+case lower: PartitioningCollection =>
+  lower.partitionings.exists(hasSemanticEqualPartitioning)
+case _ => false
+  }
+}
+if (hasSemanticEqualPartitioning(child.outputPartitioning)) {
+  child
+} else {
+  operator
 }
-  }
-  if (hasSemanticEqualPartitioning(child.outputPartitioning)) {
-child
-  } else {
-operator
-  }
 
-case operator: SparkPlan =>
-  ensureDistributionAndOrdering(reorderJoinPredicates(operator))
+  case operator: SparkPlan =>
+ensureDistributionAndOrdering(reorderJoinPredicates(operator))
+}
+
+requiredDistribution match {

Review comment:
   yeah, I considered about this approach, but not sure it's worth to do 
this integration. The requiredDistribution's shuffle origin is always different 
with the `ensureDistributionAndOrdering`, then if we want to merge them we need 
one more condition. Or if you have some other thought ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-09-01 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r700751867



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
##
@@ -57,4 +58,22 @@ object AQEUtils {
   }
 case _ => Some(UnspecifiedDistribution)
   }
+
+  // Add an extra shuffle if input plan does not satisfy the required 
distribution.
+  // This method is invoked after optimizing skewed join in case we change 
final stage
+  // output partitioning.
+  def ensureRequiredDistribution(

Review comment:
   good point, merged this code into `EnsureRequirements`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-26 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r696418503



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##
@@ -1908,6 +1914,70 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-33832: Support optimize skew join even if introduce extra 
shuffle") {
+withSQLConf(
+  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+  SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> 
"false",
+  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
+  SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
+  SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "10",
+  SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN.key -> "true") {
+  withTempView("skewData1", "skewData2") {
+spark
+  .range(0, 1000, 1, 10)
+  .selectExpr("id % 3 as key1", "id as value1")
+  .createOrReplaceTempView("skewData1")
+spark
+  .range(0, 1000, 1, 10)
+  .selectExpr("id % 1 as key2", "id as value2")
+  .createOrReplaceTempView("skewData2")
+
+// check if optimized skewed join does not satisfy the required 
distribution
+Seq(true, false).foreach { hasRequiredDistribution =>
+  Seq(true, false).foreach { hasPartitionNumber =>
+val repartition = if (hasRequiredDistribution) {
+  s"/*+ repartition(${ if (hasPartitionNumber) "10," else ""}key1) 
*/"
+} else {
+  ""
+}
+
+// check required distribution and extra shuffle
+val (_, adaptive1) =
+  runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM 
skewData1 " +
+s"JOIN skewData2 ON key1 = key2 GROUP BY key1")
+val shuffles1 = findTopLevelShuffle(adaptive1)

Review comment:
   inlined this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-26 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r696415767



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -169,15 +179,34 @@ case class AdaptiveSparkPlanExec(
 optimized
   }
 
+  def prepareQueryStages(
+  plan: SparkPlan,
+  optimizeSkewedJoin: Boolean): SparkPlan = {
+if (optimizeSkewedJoin) {
+  AQEUtils.ensureRequiredDistribution(

Review comment:
   yeah we can always run it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-26 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r696411043



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec(
   // A list of physical plan rules to be applied before creation of query 
stages. The physical
   // plan should reach a final status of query stages (i.e., no more addition 
or removal of
   // Exchange nodes) after running these rules.
-  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
-RemoveRedundantProjects,
-// For cases like `df.repartition(a, b).select(c)`, there is no 
distribution requirement for
-// the final plan, but we do need to respect the user-specified 
repartition. Here we ask
-// `EnsureRequirements` to not optimize out the user-specified 
repartition-by-col to work
-// around this case.
-EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),
-RemoveRedundantSorts,
-DisableUnnecessaryBucketedScan
-  ) ++ context.session.sessionState.queryStagePrepRules
+  private def queryStagePreparationRules(
+  optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = {

Review comment:
   Im OK to remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-26 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r696410142



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec(
   // A list of physical plan rules to be applied before creation of query 
stages. The physical
   // plan should reach a final status of query stages (i.e., no more addition 
or removal of
   // Exchange nodes) after running these rules.
-  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
-RemoveRedundantProjects,
-// For cases like `df.repartition(a, b).select(c)`, there is no 
distribution requirement for
-// the final plan, but we do need to respect the user-specified 
repartition. Here we ask
-// `EnsureRequirements` to not optimize out the user-specified 
repartition-by-col to work
-// around this case.
-EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),
-RemoveRedundantSorts,
-DisableUnnecessaryBucketedScan
-  ) ++ context.session.sessionState.queryStagePrepRules
+  private def queryStagePreparationRules(
+  optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = {

Review comment:
   this is mainly for `initalPlan` which is not needed optimize skewed join




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-26 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r696399833



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##
@@ -258,7 +258,8 @@ object OptimizeSkewedJoin extends AQEShuffleReadRule {
 
   object ShuffleStage {
 def unapply(plan: SparkPlan): Option[ShuffleQueryStageExec] = plan match {
-  case s: ShuffleQueryStageExec if s.mapStats.isDefined && 
isSupported(s.shuffle) =>
+  case s: ShuffleQueryStageExec if s.isMaterialized && 
s.mapStats.isDefined &&
+isSupported(s.shuffle) =>

Review comment:
   removed the `AQEShuffleReadRule` and simplify the condition inline




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-26 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r696398887



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##
@@ -1908,6 +1914,69 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-33832: Support optimize skew join even if introduce extra 
shuffle") {
+withSQLConf(
+  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+  SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> 
"false",
+  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
+  SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
+  SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "10",
+  SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN.key -> "true") {
+  withTempView("skewData1", "skewData2") {
+spark
+  .range(0, 1000, 1, 10)
+  .selectExpr("id % 3 as key1", "id as value1")
+  .createOrReplaceTempView("skewData1")
+spark
+  .range(0, 1000, 1, 10)
+  .selectExpr("id % 1 as key2", "id as value2")
+  .createOrReplaceTempView("skewData2")
+
+// check if optimized skewed join does not satisfy the required 
distribution
+Seq(true, false).foreach { hasRequiredDistribution =>
+  Seq(true, false).foreach { hasPartitionNumber =>
+val repartition = if (hasRequiredDistribution) {
+  s"/*+ repartition(${ if (hasPartitionNumber) "10," else ""}key1) 
*/"
+} else {
+  ""
+}
+
+// check required distribution and extra shuffle
+val (_, adaptive1) =
+  runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM 
skewData1 " +
+s"JOIN skewData2 ON key1 = key2 GROUP BY key1")
+val shuffles1 = findTopLevelShuffle(adaptive1)
+assert(shuffles1.size == 3)
+assert(shuffles1.head.shuffleOrigin == ENSURE_REQUIREMENTS)

Review comment:
   updated the comment, the head shuffle is from second EnsureRequirements 
in queryStagePreparationRules.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-24 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r695402449



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
##
@@ -57,4 +58,20 @@ object AQEUtils {
   }
 case _ => Some(UnspecifiedDistribution)
   }
+
+  // Add an extra shuffle if input plan does not satisfy the required 
distribution.
+  // This method is invoked after optimizing skewed join in case we change 
final stage
+  // output partitioning.
+  def ensureRequiredDistribution(
+  plan: SparkPlan, distribution: Option[Distribution]): SparkPlan = 
distribution match {
+case Some(d) if !plan.outputPartitioning.satisfies(d) =>
+  val numPartitions = 
d.requiredNumPartitions.getOrElse(conf.numShufflePartitions)
+  val shuffleOrigin = if (d.requiredNumPartitions.isDefined) {
+REPARTITION_BY_NUM
+  } else {
+REPARTITION_BY_COL
+  }
+  ShuffleExchangeExec(d.createPartitioning(numPartitions), plan, 
shuffleOrigin)

Review comment:
   We should retain origin shuffle here, otherwise some stage optimize 
rules  can break the `ENSIRE_REQUIREMENT` output partitioning in next stage. 
e.g. `CoalesceShufflePartitions` when the input plan's origin is 
`REPARTITION_BY_NUM`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-19 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692656713



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec(
   // A list of physical plan rules to be applied before creation of query 
stages. The physical
   // plan should reach a final status of query stages (i.e., no more addition 
or removal of
   // Exchange nodes) after running these rules.
-  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
-RemoveRedundantProjects,
-// For cases like `df.repartition(a, b).select(c)`, there is no 
distribution requirement for
-// the final plan, but we do need to respect the user-specified 
repartition. Here we ask
-// `EnsureRequirements` to not optimize out the user-specified 
repartition-by-col to work
-// around this case.
-EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),
-RemoveRedundantSorts,
-DisableUnnecessaryBucketedScan
-  ) ++ context.session.sessionState.queryStagePrepRules
+  private def queryStagePreparationRules(
+  optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = {
+val optimizeSkewedJoinRules = if (optimizeSkewedJoin) {
+  Seq(OptimizeSkewedJoin,
+// Add the EnsureRequirements rule here since OptimizeSkewedJoin will 
change
+// output partitioning, make sure we have right distribution.
+EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined))
+} else {
+  Nil
+}
+
+Seq(
+  RemoveRedundantProjects,
+  // For cases like `df.repartition(a, b).select(c)`, there is no 
distribution requirement for
+  // the final plan, but we do need to respect the user-specified 
repartition. Here we ask
+  // `EnsureRequirements` to not optimize out the user-specified 
repartition-by-col to work
+  // around this case.
+  EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),
+  RemoveRedundantSorts,
+  DisableUnnecessaryBucketedScan
+) ++ optimizeSkewedJoinRules ++ 
context.session.sessionState.queryStagePrepRules

Review comment:
   We need two `EnsureRequirements` for  `OptimizeSkewedJoin`.
   
   The first `EnsureRequirements` is to add enough exchange for Join that 
ensure the physical plan can be executed. And the second `EnsureRequirements` 
is to make sure we have right distribution after `OptimizeSkewedJoin ` applied 
since `OptimizeSkewedJoin` can change the output partitioning.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-19 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692653369



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -656,13 +687,54 @@ case class AdaptiveSparkPlanExec(
 // node to prevent the loss of the `BroadcastExchangeExec` node in DPP 
subquery.
 // Here, we also need to avoid to insert the `BroadcastExchangeExec` node 
when the newPlan
 // is already the `BroadcastExchangeExec` plan after apply the 
`LogicalQueryStageStrategy` rule.
-val finalPlan = currentPhysicalPlan match {
+def updateBroadcastExchange(plan: SparkPlan): SparkPlan = 
currentPhysicalPlan match {
   case b: BroadcastExchangeLike
-if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => 
b.withNewChildren(Seq(newPlan))
-  case _ => newPlan
+if (!plan.isInstanceOf[BroadcastExchangeLike]) => 
b.withNewChildren(Seq(plan))
+  case _ => plan
 }
 
-(finalPlan, optimized)
+val optimizedWithSkewedJoin = applyPhysicalRules(
+  optimizedPhysicalPlan,
+  optimizeSkewedJoinWithExtraShuffleRules,
+  Some((planChangeLogger, "AQE Optimize Skewed Join With Extra Shuffle"))
+)
+val validatedWithSkewedJoin =
+  checkDistribution(
+optimizedWithSkewedJoin,
+optimizedPhysicalPlan,
+isFinalStage(optimizedWithSkewedJoin),

Review comment:
   Since `OptimizeSkewedJoin` moved to preparation phase, we need to 
validate if it change the output partitioning in final stage. Otherwise, the we 
may not satisfy `requiredDistribution` if `OptimizeSkewedJoin` applied.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-19 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692092985



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewJoinAwareCost.scala
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.joins.ShuffledJoin
+
+/**
+ * A skew join aware implementation of [[Cost]], which consider shuffle number 
and skew join number
+ */
+case class SkewJoinAwareCost(
+numShuffles: Int,
+numSkewJoins: Int) extends Cost {
+  override def compare(that: Cost): Int = that match {
+case other: SkewJoinAwareCost =>
+  if (numSkewJoins > other.numSkewJoins || numShuffles < 
other.numShuffles) {

Review comment:
   Good point I think we should always pick the bigger skew join number 
first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-19 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692075920



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewJoinAwareCost.scala
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.joins.ShuffledJoin
+
+/**
+ * A skew join aware implementation of [[Cost]], which consider shuffle number 
and skew join number
+ */
+case class SkewJoinAwareCost(
+numShuffles: Int,
+numSkewJoins: Int) extends Cost {
+  override def compare(that: Cost): Int = that match {
+case other: SkewJoinAwareCost =>
+  if (numSkewJoins > other.numSkewJoins || numShuffles < 
other.numShuffles) {

Review comment:
   yes, if a plan has now skew join and less shuffle nodes than b plan, we 
will pick a plan.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-19 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692077822



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -656,13 +687,54 @@ case class AdaptiveSparkPlanExec(
 // node to prevent the loss of the `BroadcastExchangeExec` node in DPP 
subquery.
 // Here, we also need to avoid to insert the `BroadcastExchangeExec` node 
when the newPlan
 // is already the `BroadcastExchangeExec` plan after apply the 
`LogicalQueryStageStrategy` rule.
-val finalPlan = currentPhysicalPlan match {
+def updateBroadcastExchange(plan: SparkPlan): SparkPlan = 
currentPhysicalPlan match {
   case b: BroadcastExchangeLike
-if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => 
b.withNewChildren(Seq(newPlan))
-  case _ => newPlan
+if (!plan.isInstanceOf[BroadcastExchangeLike]) => 
b.withNewChildren(Seq(plan))
+  case _ => plan
 }
 
-(finalPlan, optimized)
+val optimizedWithSkewedJoin = applyPhysicalRules(
+  optimizedPhysicalPlan,
+  optimizeSkewedJoinWithExtraShuffleRules,
+  Some((planChangeLogger, "AQE Optimize Skewed Join With Extra Shuffle"))
+)
+val validatedWithSkewedJoin =
+  checkDistribution(
+optimizedWithSkewedJoin,
+optimizedPhysicalPlan,
+isFinalStage(optimizedWithSkewedJoin),
+OptimizeSkewedJoin.ruleName)
+
+// here are three reasons if validatedWithSkewedJoin is equal to 
optimizedPhysicalPlan:
+// 1. no skewed join optimized
+// 2. optimize skewed join introduce extra shuffle and force optimize is 
disabled
+// 3. optimize skewed join change final stage output partitioning
+val newPhysicalPlans = if 
(validatedWithSkewedJoin.fastEquals(optimizedPhysicalPlan)) {
+  updateBroadcastExchange(optimizedPhysicalPlan) :: Nil

Review comment:
   @cloud-fan in these three case, we will not introduce extra overhead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-19 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692075920



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewJoinAwareCost.scala
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.joins.ShuffledJoin
+
+/**
+ * A skew join aware implementation of [[Cost]], which consider shuffle number 
and skew join number
+ */
+case class SkewJoinAwareCost(
+numShuffles: Int,
+numSkewJoins: Int) extends Cost {
+  override def compare(that: Cost): Int = that match {
+case other: SkewJoinAwareCost =>
+  if (numSkewJoins > other.numSkewJoins || numShuffles < 
other.numShuffles) {

Review comment:
   yes, if a plan has now skew join and less shuffle nodes than b plan, we 
will pick a plan.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-08-13 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r688276500



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -169,10 +162,33 @@ case class AdaptiveSparkPlanExec(
 optimized
   }
 
+  private def checkDistribution(
+  newPlan: SparkPlan,
+  originPlan: SparkPlan,
+  isFinalStage: Boolean,
+  ruleName: String): SparkPlan = {

Review comment:
   this method is used by `optimizeQueryStage` and `optimizeSkewedJoin` for 
checking final stage distribution




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-07-01 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r662725098



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -252,17 +275,26 @@ case class AdaptiveSparkPlanExec(
 // plans are updated, we can clear the query stage list because at 
this point the two plans
 // are semantically and physically in sync again.
 val logicalPlan = 
replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
-val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
+val (reOptimizePhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
+val planWithExtraShuffle = 
rePlanWithExtraShuffle(reOptimizePhysicalPlan)
 val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
-val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
-if (newCost < origCost ||
-(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
+val newCost = costEvaluator.evaluateCost(reOptimizePhysicalPlan)
+val extraShuffleCost = costEvaluator.evaluateCost(planWithExtraShuffle)
+def updateCurrentPlan(newPhysicalPlan: SparkPlan): Unit = {
   logOnLevel(s"Plan changed from $currentPhysicalPlan to 
$newPhysicalPlan")
   cleanUpTempTags(newPhysicalPlan)
   currentPhysicalPlan = newPhysicalPlan
   currentLogicalPlan = newLogicalPlan
   stagesToReplace = Seq.empty[QueryStageExec]
 }
+
+if (extraShuffleCost < newCost ||
+  (extraShuffleCost == newCost && planWithExtraShuffle != 
reOptimizePhysicalPlan)) {
+  updateCurrentPlan(planWithExtraShuffle)
+} else if (newCost < origCost ||
+  (newCost == origCost && currentPhysicalPlan != 
reOptimizePhysicalPlan)) {
+  updateCurrentPlan(reOptimizePhysicalPlan)
+}

Review comment:
   @cloud-fan here use 3 costs to find the better plan
   
   1. plan with skew join if force optimize skew join
   2. plan with reOptimize if not force optimize skew join and has no extra 
shuffle
   3. origin plan if reOptimize has extra shuffle




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #32816: [SPARK-33832][SQL] Support optimize skewed join even if introduce extra shuffle

2021-06-08 Thread GitBox


ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r647489569



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##
@@ -111,6 +115,19 @@ case class AdaptiveSparkPlanExec(
 CollapseCodegenStages()
   )
 
+  // OptimizeSkewedJoin has moved into preparation rules, so we should make
+  // finalPreparationStageRules same as finalStageOptimizerRules
+  private def finalPreparationStageRules: Seq[Rule[SparkPlan]] = {

Review comment:
   yea, currently it's only for OptimizeSkewedJoin 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org