I think the problem is that once unpacked in Python, the objects take considerably more space, as they are stored as Python objects in a Python dictionary. Take a look at python/pyspark/join.py and combineByKey in python/pyspark/rdd.py. We should probably try to store these in serialized form.
I’m not sure whether there’s a great way to inspect a Python process’s memory, but looking at what consumes memory in a reducer process would be useful. Matei On Jun 4, 2014, at 2:34 PM, Brad Miller <bmill...@eecs.berkeley.edu> wrote: > Hi Matei, > > Thanks for the reply and creating the JIRA. I hear what you're saying, > although to be clear I want to still state that it seems like each reduce > task is loading significantly more data than just the records needed for that > task. The workers seem to load all data from each block containing a record > needed by the reduce task. > > I base this hypothesis on the following: > -My dataset is about 100G uncompressed, 22G serialized in memory with > compression enabled > -There are 130K records > -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) > -There are 3 cores per node (each running one reduce task at a time) > -Each node has 32G of memory > > Note that I am attempting to join the dataset to itself and I ran this > experiment after caching the dataset in memory with serialization and > compression enabled. > > Given these figures, even with only 200 partitions the average output > partition size (uncompressed) would be 1G (as the dataset is being joined to > itself, resulting in 200G over 200 partitions), requiring 3G from each > machine on average. The behavior I observe is that the kernel kills jobs in > many of the nodes at nearly the exact same time right after the read phase > starts; it seems likely this would occur in each node except the master > begins detecting failures and stops the job (and I observe memory spiking on > all machines). Indeed, I observe a large memory spike at each node. > > When I attempt the join with 2000 output partitions, it succeeds. Note that > there are about 65 records per output partition on average, which means the > reader only needs to load input from about 130 blocks (as the dataset is > joined to itself). Given that the average uncompressed block size is 60M, > even if the entire block were loaded (not just the relevant record) we would > expect about 23G of memory to be used per node on average. > > I began suspecting the behavior of loading entire blocks based on the logging > from the workers (i.e. "BlockFetcherIterator$BasicBlockFetcherIterator: > Getting 122 non-empty blocks out of 3354 blocks"). If it is definitely not > the case that entire blocks are loaded from the writers, then it would seem > like there is some significant overhead which is chewing threw lots of memory > (perhaps similar to the problem with python broadcast variables chewing > through memory https://spark-project.atlassian.net/browse/SPARK-1065). > > -Brad > > > > On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote: > In PySpark, the data processed by each reduce task needs to fit in memory > within the Python process, so you should use more tasks to process this > dataset. Data is spilled to disk across tasks. > > I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — > it’s something we’ve been meaning to look at soon. > > Matei > > On Jun 4, 2014, at 8:23 AM, Brad Miller <bmill...@eecs.berkeley.edu> wrote: > > > Hi All, > > > > I have experienced some crashing behavior with join in pyspark. When I > > attempt a join with 2000 partitions in the result, the join succeeds, but > > when I use only 200 partitions in the result, the join fails with the > > message "Job aborted due to stage failure: Master removed our application: > > FAILED". > > > > The crash always occurs at the beginning of the shuffle phase. Based on my > > observations, it seems like the workers in the read phase may be fetching > > entire blocks from the write phase of the shuffle rather than just the > > records necessary to compose the partition the reader is responsible for. > > Hence, when there are fewer partitions in the read phase, the worker is > > likely to need a record from each of the write partitions and consequently > > attempts to load the entire data set into the memory of a single machine > > (which then causes the out of memory crash I observe in /var/log/syslog). > > > > Can anybody confirm if this is the behavior of pyspark? I am glad to > > supply additional details about my observed behavior upon request. > > > > best, > > -Brad > >