[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-21 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r674500254



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -250,7 +250,12 @@ object EnsureRequirements extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
 // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.
-case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+// SPARK-35989: AQE will change the partition number so we should retain 
the REPARTITION_BY_NUM
+// shuffle which is specified by user. And also we can not remove 
REBALANCE_PARTITIONS_BY_COL,
+// it is a special shuffle used to rebalance partitions.
+// So, here we only remove REPARTITION_BY_COL in AQE.
+case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+if shuffleOrigin == REPARTITION_BY_COL || 
!conf.adaptiveExecutionEnabled =>

Review comment:
   given this, is it better to add a new physical plan for repartition ? 
which can aslo hold the `"root distribution (and potentially ordering) 
requirement"`. I remember some PR discussed about it before, and it should have 
other benefits, e.g. `AliasAwareOutputPartitioning`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-20 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r672779057



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -250,7 +250,12 @@ object EnsureRequirements extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
 // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.
-case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+// SPARK-35989: AQE will change the partition number so we should retain 
the REPARTITION_BY_NUM
+// shuffle which is specified by user. And also we can not remove 
REBALANCE_PARTITIONS_BY_COL,
+// it is a special shuffle used to rebalance partitions.
+// So, here we only remove REPARTITION_BY_COL in AQE.
+case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+if shuffleOrigin == REPARTITION_BY_COL || 
!conf.adaptiveExecutionEnabled =>

Review comment:
   yeah, as we have only skipped applying `CoalesceShufflePartitions` or 
other custom shuffle reader at final stage. But for the stages which are in the 
process, we do nothing. That's why I think a little bit hack.
   
   One other hack idea is we can remark the shuffle which is before the removed 
shuffle and change the `ENSURE_REQUIREMENTS` to `REPARTITION_BY_COL`. Then in 
AQE, we can do optimization safely.
   
   IMO, I prefer the idea of `skip removing shuffle with all shuffle origin in 
AQE`, it's simple and it can be seen as a behavior change due to AQE is enabled 
by default. If user really hit this issue, they can just disable AQE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-20 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r672779057



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -250,7 +250,12 @@ object EnsureRequirements extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
 // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.
-case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+// SPARK-35989: AQE will change the partition number so we should retain 
the REPARTITION_BY_NUM
+// shuffle which is specified by user. And also we can not remove 
REBALANCE_PARTITIONS_BY_COL,
+// it is a special shuffle used to rebalance partitions.
+// So, here we only remove REPARTITION_BY_COL in AQE.
+case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+if shuffleOrigin == REPARTITION_BY_COL || 
!conf.adaptiveExecutionEnabled =>

Review comment:
   yeah, as we have only skipped applying `CoalesceShufflePartitions` or 
other custom shuffle reader at final stage. But for the stages which are in the 
process, we do nothing. That's why I think a little bit hack.
   
   One other hack idea is we can remark the shuffle which is before the removed 
shuffle and change the `ENSURE_REQUIREMENTS` to `REPARTITION_BY_COL`. Then in 
AQE, we can do optimization safely.
   
   IMO, I prefer the idea of `skip removing shuffle with all shuffle origin in 
AQE`, it's simple and it can be seen as a behavior change due to AQE is enabled 
by default. If user really hit this issue, they can just disable AQE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-19 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r672779057



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -250,7 +250,12 @@ object EnsureRequirements extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
 // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.
-case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+// SPARK-35989: AQE will change the partition number so we should retain 
the REPARTITION_BY_NUM
+// shuffle which is specified by user. And also we can not remove 
REBALANCE_PARTITIONS_BY_COL,
+// it is a special shuffle used to rebalance partitions.
+// So, here we only remove REPARTITION_BY_COL in AQE.
+case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+if shuffleOrigin == REPARTITION_BY_COL || 
!conf.adaptiveExecutionEnabled =>

Review comment:
   yeah, as we have only skipped applying `CoalesceShufflePartitions` or 
other custom shuffle reader at final stage. But for the stages which are in the 
process, we do nothing. That's why I think a little bit hack.
   
   One other hack idea is we can remark the shuffle which is before the removed 
shuffle and change the `ENSURE_REQUIREMENTS` to `REPARTITION_BY_COL`. Then in 
AQE, we can do optimization safely.
   
   IMO, I prefer the idea of `skip removing shuffle with all shuffle origin in 
AQE`, it's simple and it can be seen as a behavior change due to AQE is enabled 
by default. If user really hit this issue, they can just disable AQE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-14 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r670066151



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##
@@ -250,7 +250,12 @@ object EnsureRequirements extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
 // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.
-case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+// SPARK-35989: AQE will change the partition number so we should retain 
the REPARTITION_BY_NUM
+// shuffle which is specified by user. And also we can not remove 
REBALANCE_PARTITIONS_BY_COL,
+// it is a special shuffle used to rebalance partitions.
+// So, here we only remove REPARTITION_BY_COL in AQE.
+case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+if shuffleOrigin == REPARTITION_BY_COL || 
!conf.adaptiveExecutionEnabled =>

Review comment:
   Sorry for going back to here. After some more thought, it would be 
better to skip removing shuffle with all shuffle origin in AQE which include 
the `REPARTITION_BY_COL`. Then the condition can be changed to `if 
!conf.adaptiveExecutionEnabled`. Given this we can remove the final stage rules 
in `AdaptiveSparkPlanExec` safely which seems a little hack.
   
   For `REPARTITION_BY_COL`, I want to explain it's also unsafe to remove. We 
may apply `OptimizeLocalShuffleReader` to change the output partitioning which 
does not follow the semantics of `RepartitionByExpression`.
   
   What do you think ? @HyukjinKwon @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-04 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r663587709



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
##
@@ -133,4 +134,27 @@ class EnsureRequirementsSuite extends SharedSparkSession {
   }.size == 2)
 }
   }
+
+  test("SPARK-35989: Do not remove REPARTITION_BY_NUM shuffle if AQE is 
enabled") {
+import testImplicits._
+Seq(true, false).foreach { enableAqe =>
+  withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAqe.toString,
+SQLConf.SHUFFLE_PARTITIONS.key -> "3",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {

Review comment:
   I see the reason, make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #33188: [SPARK-35989][SQL] Only remove redundant shuffle if shuffle origin is REPARTITION_BY_COL in AQE

2021-07-04 Thread GitBox


ulysses-you commented on a change in pull request #33188:
URL: https://github.com/apache/spark/pull/33188#discussion_r663482012



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
##
@@ -133,4 +134,27 @@ class EnsureRequirementsSuite extends SharedSparkSession {
   }.size == 2)
 }
   }
+
+  test("SPARK-35989: Do not remove REPARTITION_BY_NUM shuffle if AQE is 
enabled") {
+import testImplicits._
+Seq(true, false).foreach { enableAqe =>
+  withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAqe.toString,
+SQLConf.SHUFFLE_PARTITIONS.key -> "3",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {

Review comment:
   Seems two Indentation here is right ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org