Hi Shay,

maybe this is related to the small number of output rows (1,250) of the
last exchange step that consume those 60GB shuffle data.

Looks like your outer transformation is something like

df.groupBy($"id").agg(collect_list($"prop_name"))

Have you tried adding a repartition as an attempt to convince AQE to
exchange into a specific number of partitions?

df.groupBy($"id").agg(collect_list($"prop_name")).repartition(100, $"id")

Can you provide some Spark code that reproduce the issue with synthetic
data and cleansed Spark logic?

Cheers,
Enrico


Am 22.02.24 um 15:14 schrieb Shay Elbaz:
Dear community,

We have noticed that AQE is coalescing a substantial amount of data
(approximately 60GB) into a single partition during query execution.
This behavior is unexpected given the absence of data skew, broadcast,
and the significant size of the shuffle operation.

*Environment Details:*

 *
    Apache Spark Version: 3.1.3
 *
    Platform: Dataproc 2.0
 *
    Executors Configuration: 90GB memory, 15 cores

*Configuration Parameters:* We have examined the relevant
configuration parameters, and tried many different variations, but the
behavior persists. For example:
spark.sql.adaptive.advisoryPartitionSizeInBytes=104857600 //100MB
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000
spark.sql.adaptive.coalescePartitions.minPartitionNum=500
spark.sql.optimizer.dynamicPartitionPruning.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1 // off

*The full plan and diagram from the SQL tab are shown below*

Please advice:

 1. Are there additional configuration parameters or best practices we
    should be aware of in such scenarios?
2.
    Are there known related issues in 3.1.3? (didn't find any on Jira)


Thanks in advance,
Shay

...

Reply via email to