I think the problem is that once unpacked in Python, the objects take 
considerably more space, as they are stored as Python objects in a Python 
dictionary. Take a look at python/pyspark/join.py and combineByKey in 
python/pyspark/rdd.py. We should probably try to store these in serialized form.

I’m not sure whether there’s a great way to inspect a Python process’s memory, 
but looking at what consumes memory in a reducer process would be useful.

Matei 


On Jun 4, 2014, at 2:34 PM, Brad Miller <bmill...@eecs.berkeley.edu> wrote:

> Hi Matei,
> 
> Thanks for the reply and creating the JIRA. I hear what you're saying, 
> although to be clear I want to still state that it seems like each reduce 
> task is loading significantly more data than just the records needed for that 
> task.  The workers seem to load all data from each block containing a record 
> needed by the reduce task.
> 
> I base this hypothesis on the following:
> -My dataset is about 100G uncompressed, 22G serialized in memory with 
> compression enabled
> -There are 130K records
> -The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
> -There are 3 cores per node (each running one reduce task at a time)
> -Each node has 32G of memory
> 
> Note that I am attempting to join the dataset to itself and I ran this 
> experiment after caching the dataset in memory with serialization and 
> compression enabled.
> 
> Given these figures, even with only 200 partitions the average output 
> partition size (uncompressed) would be 1G (as the dataset is being joined to 
> itself, resulting in 200G over 200 partitions), requiring 3G from each 
> machine on average.  The behavior I observe is that the kernel kills jobs in 
> many of the nodes at nearly the exact same time right after the read phase 
> starts; it seems likely this would occur in each node except the master 
> begins detecting failures and stops the job (and I observe memory spiking on 
> all machines).  Indeed, I observe a large memory spike at each node.
> 
> When I attempt the join with 2000 output partitions, it succeeds.  Note that 
> there are about 65 records per output partition on average, which means the 
> reader only needs to load input from about 130 blocks (as the dataset is 
> joined to itself).  Given that the average uncompressed block size is 60M, 
> even if the entire block were loaded (not just the relevant record) we would 
> expect about 23G of memory to be used per node on average.
> 
> I began suspecting the behavior of loading entire blocks based on the logging 
> from the workers (i.e. "BlockFetcherIterator$BasicBlockFetcherIterator: 
> Getting 122 non-empty blocks out of 3354 blocks").  If it is definitely not 
> the case that entire blocks are loaded from the writers, then it would seem 
> like there is some significant overhead which is chewing threw lots of memory 
> (perhaps similar to the problem with python broadcast variables chewing 
> through memory https://spark-project.atlassian.net/browse/SPARK-1065).
> 
> -Brad
> 
> 
> 
> On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:
> In PySpark, the data processed by each reduce task needs to fit in memory 
> within the Python process, so you should use more tasks to process this 
> dataset. Data is spilled to disk across tasks.
> 
> I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — 
> it’s something we’ve been meaning to look at soon.
> 
> Matei
> 
> On Jun 4, 2014, at 8:23 AM, Brad Miller <bmill...@eecs.berkeley.edu> wrote:
> 
> > Hi All,
> >
> > I have experienced some crashing behavior with join in pyspark.  When I 
> > attempt a join with 2000 partitions in the result, the join succeeds, but 
> > when I use only 200 partitions in the result, the join fails with the 
> > message "Job aborted due to stage failure: Master removed our application: 
> > FAILED".
> >
> > The crash always occurs at the beginning of the shuffle phase.  Based on my 
> > observations, it seems like the workers in the read phase may be fetching 
> > entire blocks from the write phase of the shuffle rather than just the 
> > records necessary to compose the partition the reader is responsible for.  
> > Hence, when there are fewer partitions in the read phase, the worker is 
> > likely to need a record from each of the write partitions and consequently 
> > attempts to load the entire data set into the memory of a single machine 
> > (which then causes the out of memory crash I observe in /var/log/syslog).
> >
> > Can anybody confirm if this is the behavior of pyspark?  I am glad to 
> > supply additional details about my observed behavior upon request.
> >
> > best,
> > -Brad
> 
> 

Reply via email to