pyspark join crash

2014-06-04 Thread Brad Miller
Hi All,

I have experienced some crashing behavior with join in pyspark.  When I
attempt a join with 2000 partitions in the result, the join succeeds, but
when I use only 200 partitions in the result, the join fails with the
message Job aborted due to stage failure: Master removed our application:
FAILED.

The crash always occurs at the beginning of the shuffle phase.  Based on my
observations, it seems like the workers in the read phase may be fetching
entire blocks from the write phase of the shuffle rather than just the
records necessary to compose the partition the reader is responsible for.
 Hence, when there are fewer partitions in the read phase, the worker is
likely to need a record from each of the write partitions and consequently
attempts to load the entire data set into the memory of a single machine
(which then causes the out of memory crash I observe in /var/log/syslog).

Can anybody confirm if this is the behavior of pyspark?  I am glad to
supply additional details about my observed behavior upon request.

best,
-Brad


Re: pyspark join crash

2014-06-04 Thread Matei Zaharia
In PySpark, the data processed by each reduce task needs to fit in memory 
within the Python process, so you should use more tasks to process this 
dataset. Data is spilled to disk across tasks.

I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — 
it’s something we’ve been meaning to look at soon.

Matei

On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote:

 Hi All,
 
 I have experienced some crashing behavior with join in pyspark.  When I 
 attempt a join with 2000 partitions in the result, the join succeeds, but 
 when I use only 200 partitions in the result, the join fails with the message 
 Job aborted due to stage failure: Master removed our application: FAILED.
 
 The crash always occurs at the beginning of the shuffle phase.  Based on my 
 observations, it seems like the workers in the read phase may be fetching 
 entire blocks from the write phase of the shuffle rather than just the 
 records necessary to compose the partition the reader is responsible for.  
 Hence, when there are fewer partitions in the read phase, the worker is 
 likely to need a record from each of the write partitions and consequently 
 attempts to load the entire data set into the memory of a single machine 
 (which then causes the out of memory crash I observe in /var/log/syslog).
 
 Can anybody confirm if this is the behavior of pyspark?  I am glad to supply 
 additional details about my observed behavior upon request.
 
 best,
 -Brad



Re: pyspark join crash

2014-06-04 Thread Brad Miller
Hi Matei,

Thanks for the reply and creating the JIRA. I hear what you're saying,
although to be clear I want to still state that it seems like each reduce
task is loading significantly more data than just the records needed for
that task.  The workers seem to load all data from each block containing a
record needed by the reduce task.

I base this hypothesis on the following:
-My dataset is about 100G uncompressed, 22G serialized in memory with
compression enabled
-There are 130K records
-The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
-There are 3 cores per node (each running one reduce task at a time)
-Each node has 32G of memory

Note that I am attempting to join the dataset to itself and I ran this
experiment after caching the dataset in memory with serialization and
compression enabled.

Given these figures, even with only 200 partitions the average output
partition size (uncompressed) would be 1G (as the dataset is being joined
to itself, resulting in 200G over 200 partitions), requiring 3G from each
machine on average.  The behavior I observe is that the kernel kills jobs
in many of the nodes at nearly the exact same time right after the read
phase starts; it seems likely this would occur in each node except the
master begins detecting failures and stops the job (and I observe memory
spiking on all machines).  Indeed, I observe a large memory spike at each
node.

When I attempt the join with 2000 output partitions, it succeeds.  Note
that there are about 65 records per output partition on average, which
means the reader only needs to load input from about 130 blocks (as the
dataset is joined to itself).  Given that the average uncompressed block
size is 60M, even if the entire block were loaded (not just the relevant
record) we would expect about 23G of memory to be used per node on average.

I began suspecting the behavior of loading entire blocks based on the
logging from the workers (i.e.
BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty
blocks out of 3354 blocks).  If it is definitely not the case that entire
blocks are loaded from the writers, then it would seem like there is some
significant overhead which is chewing threw lots of memory (perhaps similar
to the problem with python broadcast variables chewing through memory
https://spark-project.atlassian.net/browse/SPARK-1065).

-Brad



On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 In PySpark, the data processed by each reduce task needs to fit in memory
 within the Python process, so you should use more tasks to process this
 dataset. Data is spilled to disk across tasks.

 I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track
 this — it’s something we’ve been meaning to look at soon.

 Matei

 On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu
 wrote:

  Hi All,
 
  I have experienced some crashing behavior with join in pyspark.  When I
 attempt a join with 2000 partitions in the result, the join succeeds, but
 when I use only 200 partitions in the result, the join fails with the
 message Job aborted due to stage failure: Master removed our application:
 FAILED.
 
  The crash always occurs at the beginning of the shuffle phase.  Based on
 my observations, it seems like the workers in the read phase may be
 fetching entire blocks from the write phase of the shuffle rather than just
 the records necessary to compose the partition the reader is responsible
 for.  Hence, when there are fewer partitions in the read phase, the worker
 is likely to need a record from each of the write partitions and
 consequently attempts to load the entire data set into the memory of a
 single machine (which then causes the out of memory crash I observe in
 /var/log/syslog).
 
  Can anybody confirm if this is the behavior of pyspark?  I am glad to
 supply additional details about my observed behavior upon request.
 
  best,
  -Brad




Re: pyspark join crash

2014-06-04 Thread Matei Zaharia
I think the problem is that once unpacked in Python, the objects take 
considerably more space, as they are stored as Python objects in a Python 
dictionary. Take a look at python/pyspark/join.py and combineByKey in 
python/pyspark/rdd.py. We should probably try to store these in serialized form.

I’m not sure whether there’s a great way to inspect a Python process’s memory, 
but looking at what consumes memory in a reducer process would be useful.

Matei 


On Jun 4, 2014, at 2:34 PM, Brad Miller bmill...@eecs.berkeley.edu wrote:

 Hi Matei,
 
 Thanks for the reply and creating the JIRA. I hear what you're saying, 
 although to be clear I want to still state that it seems like each reduce 
 task is loading significantly more data than just the records needed for that 
 task.  The workers seem to load all data from each block containing a record 
 needed by the reduce task.
 
 I base this hypothesis on the following:
 -My dataset is about 100G uncompressed, 22G serialized in memory with 
 compression enabled
 -There are 130K records
 -The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
 -There are 3 cores per node (each running one reduce task at a time)
 -Each node has 32G of memory
 
 Note that I am attempting to join the dataset to itself and I ran this 
 experiment after caching the dataset in memory with serialization and 
 compression enabled.
 
 Given these figures, even with only 200 partitions the average output 
 partition size (uncompressed) would be 1G (as the dataset is being joined to 
 itself, resulting in 200G over 200 partitions), requiring 3G from each 
 machine on average.  The behavior I observe is that the kernel kills jobs in 
 many of the nodes at nearly the exact same time right after the read phase 
 starts; it seems likely this would occur in each node except the master 
 begins detecting failures and stops the job (and I observe memory spiking on 
 all machines).  Indeed, I observe a large memory spike at each node.
 
 When I attempt the join with 2000 output partitions, it succeeds.  Note that 
 there are about 65 records per output partition on average, which means the 
 reader only needs to load input from about 130 blocks (as the dataset is 
 joined to itself).  Given that the average uncompressed block size is 60M, 
 even if the entire block were loaded (not just the relevant record) we would 
 expect about 23G of memory to be used per node on average.
 
 I began suspecting the behavior of loading entire blocks based on the logging 
 from the workers (i.e. BlockFetcherIterator$BasicBlockFetcherIterator: 
 Getting 122 non-empty blocks out of 3354 blocks).  If it is definitely not 
 the case that entire blocks are loaded from the writers, then it would seem 
 like there is some significant overhead which is chewing threw lots of memory 
 (perhaps similar to the problem with python broadcast variables chewing 
 through memory https://spark-project.atlassian.net/browse/SPARK-1065).
 
 -Brad
 
 
 
 On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 In PySpark, the data processed by each reduce task needs to fit in memory 
 within the Python process, so you should use more tasks to process this 
 dataset. Data is spilled to disk across tasks.
 
 I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — 
 it’s something we’ve been meaning to look at soon.
 
 Matei
 
 On Jun 4, 2014, at 8:23 AM, Brad Miller bmill...@eecs.berkeley.edu wrote:
 
  Hi All,
 
  I have experienced some crashing behavior with join in pyspark.  When I 
  attempt a join with 2000 partitions in the result, the join succeeds, but 
  when I use only 200 partitions in the result, the join fails with the 
  message Job aborted due to stage failure: Master removed our application: 
  FAILED.
 
  The crash always occurs at the beginning of the shuffle phase.  Based on my 
  observations, it seems like the workers in the read phase may be fetching 
  entire blocks from the write phase of the shuffle rather than just the 
  records necessary to compose the partition the reader is responsible for.  
  Hence, when there are fewer partitions in the read phase, the worker is 
  likely to need a record from each of the write partitions and consequently 
  attempts to load the entire data set into the memory of a single machine 
  (which then causes the out of memory crash I observe in /var/log/syslog).
 
  Can anybody confirm if this is the behavior of pyspark?  I am glad to 
  supply additional details about my observed behavior upon request.
 
  best,
  -Brad