The reduce phase is always more resource-intensive than the map phase.
Couple of suggestions you may want to consider:
1. Setting the number of partitions to 18K may be way too high (the
default number is only 200). You may want to just use the default
and the scheduler will automatically increase the partitions if needed.
2. Turn on dynamic resource allocation (DRA)
(https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation).
It would allow those executors that finish the map tasks returning
the resources (e.g. RAM, CPU cores) back to the cluster, and
reallocate the resources to the reduce tasks. This feature (post
Spark 3.0) is also available to K8, but turned off by default.
3. With DRA turned on, you may want also try to play with a small
number of number of executors/nodes thus reducing shuffling needs,
given the fact that you only have 128GB RAM.
Hope this helps...
On 9/29/22 2:12 PM, Igor Calabria wrote:
Hi Everyone,
I'm running spark 3.2 on kubernetes and have a job with a decently
sized shuffle of almost 4TB. The relevant cluster config is as follows:
- 30 Executors. 16 physical cores, configured with 32 Cores for spark
- 128 GB RAM
- shuffle.partitions is 18k which gives me tasks of around 150~180MB
The job runs fine but I'm bothered by how underutilized the cluster
gets during the reduce phase. During the map(reading data from s3 and
writing the shuffle data) CPU usage, disk throughput and network usage
is as expected, but during the reduce phase it gets really low. It
seems the main bottleneck is reading shuffle data from other nodes,
task statistics reports values ranging from 25s to several minutes(the
task sizes are really close, they aren't skewed). I've tried
increasing "spark.reducer.maxSizeInFlight" and
"spark.shuffle.io.numConnectionsPerPeer" and it did improve
performance by a little, but not enough to saturate the cluster resources.
Did I miss some more tuning parameters that could help?
One obvious thing would be to vertically increase the machines and use
less nodes to minimize traffic, but 30 nodes doesn't seem like much
even considering 30x30 connections.
Thanks in advance!