[ 
https://issues.apache.org/jira/browse/SPARK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-32056:
----------------------------------
    Description: 
when adaptive query execution is enabled the following expression should 
support coalescing of partitions:
{code:java}
dataframe.repartition(col("somecolumn")) {code}
currently it does not because it simply calls the repartition implementation 
where number of partitions is specified:
{code:java}
  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
  }{code}
and repartition with the number of partitions specified does now allow for 
coalescing of partitions (since this breaks the user's expectation that it will 
have the number of partitions specified).

for more context see the discussion here:

[https://github.com/apache/spark/pull/27986]

a simple test to confirm that repartition by key does not support coalescing of 
partitions can be added in AdaptiveQueryExecSuite like this (it currently 
fails):
{code:java}
  test("SPARK-32056 repartition has less partitions for small data when 
adaptiveExecutionEnabled") {
    Seq(true, false).foreach { enableAQE =>
      withSQLConf(
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
        SQLConf.SHUFFLE_PARTITIONS.key -> "50",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
        SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
        val partitionsNum = (1 to 10).toDF.repartition($"value")
          .rdd.collectPartitions().length
        if (enableAQE) {
          assert(partitionsNum < 50)
        } else {
          assert(partitionsNum === 50)
        }
      }
    }
  }
{code}
 

  was:
when adaptive query execution is enabled the following expression should 
support coalescing of partitions:
{code:java}
dataframe.repartition(col("somecolumn")) {code}
currently it does not because it simply calls the repartition implementation 
where number of partitions is specified:
{code:java}
  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
  }{code}
and repartition with the number of partitions specified does now allow for 
coalescing of partitions (since this breaks the user's expectation that it will 
have the number of partitions specified).

for more context see the discussion here:

[https://github.com/apache/spark/pull/27986]

a simple test to confirm that repartition by key does not support coalescing of 
partitions can be added in AdaptiveQueryExecSuite like this (it currently 
fails):
{code:java}
  test("SPARK-????? repartition has less partitions for small data when 
adaptiveExecutionEnabled") {
    Seq(true, false).foreach { enableAQE =>
      withSQLConf(
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
        SQLConf.SHUFFLE_PARTITIONS.key -> "50",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
        SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
        val partitionsNum = (1 to 10).toDF.repartition($"value")
          .rdd.collectPartitions().length
        if (enableAQE) {
          assert(partitionsNum < 50)
        } else {
          assert(partitionsNum === 50)
        }
      }
    }
  }
{code}
 


> Repartition by key should support partition coalesce for AQE
> ------------------------------------------------------------
>
>                 Key: SPARK-32056
>                 URL: https://issues.apache.org/jira/browse/SPARK-32056
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>         Environment: spark release 3.0.0
>            Reporter: koert kuipers
>            Priority: Major
>
> when adaptive query execution is enabled the following expression should 
> support coalescing of partitions:
> {code:java}
> dataframe.repartition(col("somecolumn")) {code}
> currently it does not because it simply calls the repartition implementation 
> where number of partitions is specified:
> {code:java}
>   def repartition(partitionExprs: Column*): Dataset[T] = {
>     repartition(sparkSession.sessionState.conf.numShufflePartitions, 
> partitionExprs: _*)
>   }{code}
> and repartition with the number of partitions specified does now allow for 
> coalescing of partitions (since this breaks the user's expectation that it 
> will have the number of partitions specified).
> for more context see the discussion here:
> [https://github.com/apache/spark/pull/27986]
> a simple test to confirm that repartition by key does not support coalescing 
> of partitions can be added in AdaptiveQueryExecSuite like this (it currently 
> fails):
> {code:java}
>   test("SPARK-32056 repartition has less partitions for small data when 
> adaptiveExecutionEnabled") {
>     Seq(true, false).foreach { enableAQE =>
>       withSQLConf(
>         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
>         SQLConf.SHUFFLE_PARTITIONS.key -> "50",
>         SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
>         SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
>         val partitionsNum = (1 to 10).toDF.repartition($"value")
>           .rdd.collectPartitions().length
>         if (enableAQE) {
>           assert(partitionsNum < 50)
>         } else {
>           assert(partitionsNum === 50)
>         }
>       }
>     }
>   }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to