Hello,

I’m benchmarking our hardware and Spark deployment by repartitioning large 
datasets. We recently corrected a misconfigured aggregate network link (bonded) 
that was causing fatal network timeouts in long-running jobs. Now that it’s 
fixed, we still observe less-than-desirable performance while simply 
repartitioning a randomly-generated dataset.

My primary question is: What is the low-level behavior of the “Exchange 
hashpartitioning(…)” operation, with respect to moving data to and from 
disk/hosts?

I used AWS EMR to set up a similar Spark cluster and run the exact same job, 
under nearly identical conditions. Our cluster performs the first stage nearly 
2x faster than on the AWS cluster, but takes over 2x as long to complete the 
second stage (Explicit partitioning appears to always require two stages). I 
want to better understand the Exchange operation in order to describe this 
performance discrepancy, and hopefully find a correlation to our cluster’s 
resource limitations (e.g. disk I/O or IOPS capability).

I’ve already benchmarked our system resources and compared to AWS, so I can 
make some assumptions. I have investigated the Exchange (and some related) 
source code but it’s not clear to me what actually occurs with respect to I/O. 
It seems to me that the first stage is basically a scan, and is very fast 
because it’s only really limited by sequential disk I/O speed. The second stage 
does not appear to stress any resource on the cluster, but yet can take 10x as 
long to complete as the first stage… Finally, the only hint that something 
might be “wrong” is a proportionally high “Shuffle Read Blocked Time” for each 
task during the second stage (90% of task duration).

If I’m not mistaken, my assessment thus far can be applied to shuffles in 
general, since they often require repartitioning.

Current configuration, although the AWS EMR comparison used a significantly 
reduced set of Executors:


  *   5 hosts, 5x 10 TB disks each
  *   54 executors, 5 vcores and 23 GB each
  *   50+ billion records of form Record(a: Long, b: Long, c: Long, d: Long), 
where ‘a’-‘d’ are randomly-generated values. 1.5+ TB total size, Parquet format
  *   The job is as simple as `spark.read.parquet(“input.dat”).repartition(N, 
“a”, “b”, “c”, “d”).write.parquet(“output.dat”), where N is roughly 
(input_data_size / 128 MB).

Thanks!
---
Joe Naegele
Grier Forensics
410.220.0968

Reply via email to