Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1460#issuecomment-50072248
  
    Hey Davies, I tried this out a bit and saw two issues / areas for 
improvement:
    
    1) Since the ExternalMerger is used in both map tasks and reduce tasks, one 
problem that can happen is that the reduce task's data is already hashed modulo 
the # of reduce tasks, and so you get many empty buckets. For example, if you 
have 2 reduce tasks, task 0 gets all the values whose hash code is even, so it 
can only use half its buckets. If you have 64 reduce tasks, only one bucket is 
used.
    
    The best way to fix this would be to hash values with a random hash 
function when choosing the bucket. One simple way might be to generate a random 
integer X for each ExternalMerger and then take hash((key, X)) instead of 
hash(key) when choosing the bucket. This is equivalent to salting your hash 
function. Maybe you have other ideas but I'd suggest trying this first.
    
    2) I also noticed that sometimes maps would fill up again before the old 
memory was fully freed, leading to smaller spills. For example, for (Int, Int) 
pairs the first spill from 512 MB memory is about 68 MB of files, but later 
spills were only around 20 MB. I found that I could get better performance 
overall by adding some gc.collect() calls after every data.clear() and 
pdata.clear(). This freed more memory faster and allowed us to do more work in 
memory before spilling. The perf difference for one test job was around 30% but 
you should try it on your own jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to