Hi, Spark experts:
I did rdd.coalesce(numPartitions).saveAsSequenceFile(dir) in my code, which
generates the rdd's in streamed batches. It generates numPartitions of files as
expected with names dir/part-x. However, the first couple of files (e.g.,
part-0, part-1) have many times of records than the other files. This
highly skewed distribution causes stragglers (and hence unpredictable execution
time) and also the need to allocate memory by the worst cases (because some of
the records can be much larger than average).
To solve this problem, I replaced coalesce(numPartitions) with
repartition(numPartitions) or coalesce(numPartitions, shuffle=true), which are
equivalent. As a result, the records are more evenly distributed over the
output files and the execution time becomes more predictable. It of coarse
incurs a lot of shuffle traffic. However, the GC time became prohibitively
high, which crashed my app in just a few hours. Adding more memory to executors
didn't seem to help.
Do you have any suggestion here on how to spread the data without the GC costs?
Does repartition() redistribute/shuffle every record by hash partitioner? Why
does it drive the GC time so high?
Thanks,Du
On Wednesday, March 4, 2015 5:39 PM, Zhan Zhang zzh...@hortonworks.com
wrote:
It use HashPartitioner to distribute the record to different partitions, but
the key is just integer evenly across output partitions.
From the code, each resulting partition will get very similar number of
records.
Thanks.
Zhan Zhang
On Mar 4, 2015, at 3:47 PM, Du Li l...@yahoo-inc.com.INVALID wrote:
Hi,
My RDD's are created from kafka stream. After receiving a RDD, I want to do
coalesce/repartition it so that the data will be processed in a set of machines
in parallel as even as possible. The number of processing nodes is larger than
the receiving nodes.
My question is how the coalesce/repartition works. Does it distribute by the
number of records or number of bytes? In my app, my observation is that the
distribution seems by number of records. The consequence is, however, some
executors have to process x1000 as much as data when the sizes of records are
very skewed. Then we have to allocate memory by the worst case.
Is there a way to programmatically affect the coalesce /repartition scheme?
Thanks,Du