[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662102426



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
##
@@ -459,9 +466,36 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
 Seq(
   Some(specs1),
   Some(specs2)),
-targetSize, 1)
+targetSize)
   assert(coalesced == Seq(expected1, expected2))
 }
+
+{
+  // No actual coalescing happened, return empty result.

Review comment:
   not related to this PR. Just found we missed testing this case when 
reading the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662152677



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
##
@@ -61,7 +63,9 @@ object ShufflePartitionsUtil extends Logging {
 // `maxTargetSize` from being set to 0.
 val maxTargetSize = math.max(
   math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 
16)
-val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+// The target size can be very small if minNumPartitions is super big. 
Here we use
+// minPartitionSize as the lower bound to avoid too small partitions after 
coalescing.
+val targetSize = math.max(math.min(maxTargetSize, advisoryTargetSize), 
minPartitionSize)
 
 val shuffleIds = 
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
 logInfo(s"For shuffle($shuffleIds), advisory target size: 
$advisoryTargetSize, " +

Review comment:
   sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662151008



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -525,18 +525,34 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  private val MIN_PARTITION_NUM_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionNum"
+  private val MIN_PARTITION_SIZE_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionSize"
+
   val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
-buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
+buildConf(MIN_PARTITION_NUM_KEY)
   .doc("The suggested (not guaranteed) minimum number of shuffle 
partitions after " +
 "coalescing. If not set, the default value is the default parallelism 
of the " +
-"Spark cluster. This configuration only has an effect when " +
-s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+s"Spark cluster, w.r.t. the minimum partition size set by 
'$MIN_PARTITION_SIZE_KEY'. " +
+s"This configuration only has an effect when 
'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
 s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
   .version("3.0.0")
   .intConf
   .checkValue(_ > 0, "The minimum number of partitions must be positive.")
   .createOptional
 
+  val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
+buildConf(MIN_PARTITION_SIZE_KEY)
+  .doc(s"The minimum size of shuffle partitions after coalescing if 
'$MIN_PARTITION_NUM_KEY' " +
+s"is not set. When '$MIN_PARTITION_NUM_KEY' is not set, the minimum 
number of " +
+"shuffle partitions after coalescing will fall back to the default 
parallelism " +
+"of the Spark cluster, sometimes causing partition sizes to be too 
small. This config " +
+"aims to prevent this situation. Note that this config does not take 
effect if " +
+s"'${COALESCE_PARTITIONS_MIN_PARTITION_NUM.key}' is set by the user.")
+  .version("3.2.0")
+  .bytesConf(ByteUnit.BYTE)
+  .checkValue(_ > 0, "The minimum partition size must be positive.")
+  .createWithDefaultString("1MB")

Review comment:
   We tried some values internally and 1MB gives the best perf in TPCDS. Of 
curse, this should depend on cluster resources, and 1MB is good enough to avoid 
the worse cases: many partitions of several KBs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662151008



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -525,18 +525,34 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  private val MIN_PARTITION_NUM_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionNum"
+  private val MIN_PARTITION_SIZE_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionSize"
+
   val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
-buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
+buildConf(MIN_PARTITION_NUM_KEY)
   .doc("The suggested (not guaranteed) minimum number of shuffle 
partitions after " +
 "coalescing. If not set, the default value is the default parallelism 
of the " +
-"Spark cluster. This configuration only has an effect when " +
-s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+s"Spark cluster, w.r.t. the minimum partition size set by 
'$MIN_PARTITION_SIZE_KEY'. " +
+s"This configuration only has an effect when 
'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
 s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
   .version("3.0.0")
   .intConf
   .checkValue(_ > 0, "The minimum number of partitions must be positive.")
   .createOptional
 
+  val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
+buildConf(MIN_PARTITION_SIZE_KEY)
+  .doc(s"The minimum size of shuffle partitions after coalescing if 
'$MIN_PARTITION_NUM_KEY' " +
+s"is not set. When '$MIN_PARTITION_NUM_KEY' is not set, the minimum 
number of " +
+"shuffle partitions after coalescing will fall back to the default 
parallelism " +
+"of the Spark cluster, sometimes causing partition sizes to be too 
small. This config " +
+"aims to prevent this situation. Note that this config does not take 
effect if " +
+s"'${COALESCE_PARTITIONS_MIN_PARTITION_NUM.key}' is set by the user.")
+  .version("3.2.0")
+  .bytesConf(ByteUnit.BYTE)
+  .checkValue(_ > 0, "The minimum partition size must be positive.")
+  .createWithDefaultString("1MB")

Review comment:
   We tried some values internally and 1MB gives the best perf in TPCDS. Of 
curse, this should depend on cluster resources, and 1MB is good enough to avoid 
the worse case: many partitions of several KBs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662152464



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
##
@@ -61,7 +63,9 @@ object ShufflePartitionsUtil extends Logging {
 // `maxTargetSize` from being set to 0.
 val maxTargetSize = math.max(
   math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 
16)
-val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+// The target size can be very small if minNumPartitions is super big. 
Here we use
+// minPartitionSize as the lower bound to avoid too small partitions after 
coalescing.
+val targetSize = math.max(math.min(maxTargetSize, advisoryTargetSize), 
minPartitionSize)

Review comment:
   I don't have a strong opinion here. We can say that `minPartitionSize` 
larger than `advisoryTargetSize` is an ill pattern and we should forbid, or we 
just respect whatever user sets in for `minPartitionSize`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662328867



##
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##
@@ -1813,7 +1813,7 @@ class DataFrameSuite extends QueryTest
   (1 to 100).map(i => TestData2(i % 10, i))).toDF()
 
 // Distribute and order by.
-val df4 = data.repartition($"a").sortWithinPartitions($"b".desc)
+val df4 = data.repartition(5, $"a").sortWithinPartitions($"b".desc)

Review comment:
   now partition coalescing applies here and then we are sorting one single 
partition, which fails this test. Adding the partition number of skip partition 
coalescing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662328867



##
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##
@@ -1813,7 +1813,7 @@ class DataFrameSuite extends QueryTest
   (1 to 100).map(i => TestData2(i % 10, i))).toDF()
 
 // Distribute and order by.
-val df4 = data.repartition($"a").sortWithinPartitions($"b".desc)
+val df4 = data.repartition(5, $"a").sortWithinPartitions($"b".desc)

Review comment:
   now partition coalescing applies here and then we are sorting one single 
partition, which fails this test. Adding the partition number to skip partition 
coalescing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662362583



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -525,8 +525,20 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COALESCE_PARTITIONS_PARALLELISM_FIRST =
+buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")

Review comment:
   `ADVISORY_PARTITION_SIZE_IN_BYTES` doesn't mention 
`COALESCE_PARTITIONS_MIN_PARTITION_NUM` either. I think it's fine to only refer 
to "big" confs in the "small" confs, not from the other side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #33172: [SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

2021-07-01 Thread GitBox


cloud-fan commented on a change in pull request #33172:
URL: https://github.com/apache/spark/pull/33172#discussion_r662468622



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -525,10 +525,35 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  private val MIN_PARTITION_SIZE_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionSize"
+
+  val COALESCE_PARTITIONS_PARALLELISM_FIRST =
+buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
+  .doc("When true, Spark ignores the target size specified by " +
+s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when 
coalescing contiguous " +
+"shuffle partitions, and only respect the minimum partition size 
specified by " +
+s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the 
parallelism. " +
+"This is to avoid performance regression when enabling adaptive query 
execution. " +
+"It's recommended to set this config to false and respect the target 
size specified by " +
+s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")

Review comment:
   @viirya is it clear to you now?

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -525,10 +525,35 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  private val MIN_PARTITION_SIZE_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionSize"
+
+  val COALESCE_PARTITIONS_PARALLELISM_FIRST =
+buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
+  .doc("When true, Spark ignores the target size specified by " +
+s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when 
coalescing contiguous " +
+"shuffle partitions, and only respect the minimum partition size 
specified by " +
+s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the 
parallelism. " +
+"This is to avoid performance regression when enabling adaptive query 
execution. " +
+"It's recommended to set this config to false and respect the target 
size specified by " +
+s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")

Review comment:
   @viirya is it clearer to you now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org