Hi Shay, maybe this is related to the small number of output rows (1,250) of the last exchange step that consume those 60GB shuffle data.
Looks like your outer transformation is something like df.groupBy($"id").agg(collect_list($"prop_name")) Have you tried adding a repartition as an attempt to convince AQE to exchange into a specific number of partitions? df.groupBy($"id").agg(collect_list($"prop_name")).repartition(100, $"id") Can you provide some Spark code that reproduce the issue with synthetic data and cleansed Spark logic? Cheers, Enrico Am 22.02.24 um 15:14 schrieb Shay Elbaz:
Dear community, We have noticed that AQE is coalescing a substantial amount of data (approximately 60GB) into a single partition during query execution. This behavior is unexpected given the absence of data skew, broadcast, and the significant size of the shuffle operation. *Environment Details:* * Apache Spark Version: 3.1.3 * Platform: Dataproc 2.0 * Executors Configuration: 90GB memory, 15 cores *Configuration Parameters:* We have examined the relevant configuration parameters, and tried many different variations, but the behavior persists. For example: spark.sql.adaptive.advisoryPartitionSizeInBytes=104857600 //100MB spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000 spark.sql.adaptive.coalescePartitions.minPartitionNum=500 spark.sql.optimizer.dynamicPartitionPruning.enabled=false spark.sql.autoBroadcastJoinThreshold=-1 // off *The full plan and diagram from the SQL tab are shown below* Please advice: 1. Are there additional configuration parameters or best practices we should be aware of in such scenarios? 2. Are there known related issues in 3.1.3? (didn't find any on Jira) Thanks in advance, Shay
...