Re: Missing shuffle files

2015-02-28 Thread Corey Nolet
Just wanted to point out- raising the memory-head (as I saw in the logs)
was the fix for this issue and I have not seen dying executors since this
calue was increased

On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg arp...@spotify.com wrote:

 If you thinking of the yarn memory overhead, then yes, I have increased
 that as well. However, I'm glad to say that my job finished successfully
 finally. Besides the timeout and memory settings, performing repartitioning
 (with shuffling) at the right time seems to be the key to make this large
 job succeed. With all the transformations in the job, the partition
 distribution was becoming increasingly skewed. Not easy to figure out when
 and to what number of partitions to set, and takes forever to tweak these
 settings since it's works perfectly for small datasets and you'll have to
 experiment with large time-consuming jobs. Imagine if there was an
 automatic partition reconfiguration function that automagically did that...


 On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got
 over 6000 partitions for some of these RDDs which immediately blows the
 heap somehow- I'm still not exactly sure how. If I coalesce them down to
 about 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems
 to at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this 
 period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on 
 memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the 

Re: Missing shuffle files

2015-02-24 Thread Anders Arpteg
If you thinking of the yarn memory overhead, then yes, I have increased
that as well. However, I'm glad to say that my job finished successfully
finally. Besides the timeout and memory settings, performing repartitioning
(with shuffling) at the right time seems to be the key to make this large
job succeed. With all the transformations in the job, the partition
distribution was becoming increasingly skewed. Not easy to figure out when
and to what number of partitions to set, and takes forever to tweak these
settings since it's works perfectly for small datasets and you'll have to
experiment with large time-consuming jobs. Imagine if there was an
automatic partition reconfiguration function that automagically did that...

On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got over
 6000 partitions for some of these RDDs which immediately blows the heap
 somehow- I'm still not exactly sure how. If I coalesce them down to about
 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do 
 some
 more advanced partition tuning to make this job work, as it's 

Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
No, unfortunately we're not making use of dynamic allocation or the
external shuffle service. Hoping that we could reconfigure our cluster to
make use of it, but since it requires changes to the cluster itself (and
not just the Spark app), it could take some time.

Unsure if task 450 was acting as a reducer or not, but seems possible.
Probably due to a crashed executor as you say. Seems like I need to do some
more advanced partition tuning to make this job work, as it's currently
rather high number of partitions.

Thanks for the help so far! It's certainly a frustrating task to debug when
everything's working perfectly on sample data locally and crashes hard when
running on the full dataset on the cluster...

On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors do
 you have total? And what is the memory size for each? You can change what
 fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing that
 executors are being lost as well. Thing is, I can't figure out how they are
 dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
 allocated for the application. I was thinking perhaps it was possible that
 a single executor was getting a single or a couple large partitions but
 shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

  TIA,
 Anders







Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I'm looking @ my yarn container logs for some of the executors which appear
to be failing (with the missing shuffle files). I see exceptions that say
client.TransportClientFactor: Found inactive connection to host/ip:port,
closing it.

Right after that I see shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
connect to host/ip:port

Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

Finally, following the sigterm, I see FileNotFoundExcception:
/hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
such file for directory)

I'm looking @ the nodemanager and application master logs and I see no
indications whatsoever that there were any memory issues during this period
of time. The Spark UI is telling me none of the executors are really using
too much memory when this happens. It is a big job that's catching several
100's of GB but each node manager on the cluster has 64gb of ram just for
yarn containers (physical nodes have 128gb). On this cluster, we have 128
nodes. I've also tried using DISK_ONLY storage level but to no avail.

Any further ideas on how to track this down? Again, we're able to run this
same job on about 1/5th of the data just fine.The only thing that's
pointing me towards a memory issue is that it seems to be happening in the
same stages each time and when I lower the memory that each executor has
allocated it happens in earlier stages but I can't seem to find anything
that says an executor (or container for that matter) has run low on memory.



On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
 wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors do
 you have total? And what is the memory size for each? You can change what
 fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  

Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
Sounds very similar to what I experienced Corey. Something that seems to at
least help with my problems is to have more partitions. Am already fighting
between ending up with too many partitions in the end and having too few in
the beginning. By coalescing at late as possible and avoiding too few in
the beginning, the problems seems to decrease. Also, increasing
spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
significantly (~700 secs), the problems seems to almost disappear. Don't
wont to celebrate yet, still long way left before the job complete but it's
looking better...

On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run this
 same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
 wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
 what fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are 

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I've got the opposite problem with regards to partitioning. I've got over
6000 partitions for some of these RDDs which immediately blows the heap
somehow- I'm still not exactly sure how. If I coalesce them down to about
600-800 partitions, I get the problems where the executors are dying
without any other error messages (other than telling me the executor was
lost in the UI). If I don't coalesce, I pretty immediately get Java heap
space exceptions that kill the job altogether.

Putting in the timeouts didn't seem to help the case where I am coalescing.
Also, I don't see any dfferences between 'disk only' and 'memory and disk'
storage levels- both of them are having the same problems. I notice large
shuffle files (30-40gb) that only seem to spill a few hundred mb.

On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
  wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
 what fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the 

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I *think* this may have been related to the default memory overhead setting
being too low. I raised the value to 1G it and tried my job again but i had
to leave the office before it finished. It did get further but I'm not
exactly sure if that's just because i raised the memory. I'll see tomorrow-
but i have a suspicion this may have been the cause of the executors being
killed by the application master.
On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got over
 6000 partitions for some of these RDDs which immediately blows the heap
 somehow- I'm still not exactly sure how. If I coalesce them down to about
 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui 
 same...@databricks.com wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching
 the Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could
 cause the stage to fail. Sometimes this forces the previous stage to also
 be re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will 

Re: Missing shuffle files

2015-02-22 Thread Sameer Farooqui
Do you guys have dynamic allocation turned on for YARN?

Anders, was Task 450 in your job acting like a Reducer and fetching the Map
spill output data from a different node?

If a Reducer task can't read the remote data it needs, that could cause the
stage to fail. Sometimes this forces the previous stage to also be
re-computed if it's a wide dependency.

But like Petar said, if you turn the external shuffle service on, YARN
NodeManager process on the slave machines will serve out the map spill
data, instead of the Executor JVMs (by default unless you turn external
shuffle on, the Executor JVM itself serves out the shuffle data which
causes problems if an Executor dies).

Core, how often are Executors crashing in your app? How many Executors do
you have total? And what is the memory size for each? You can change what
fraction of the Executor heap will be used for your user code vs the
shuffle vs RDD caching with the spark.storage.memoryFraction setting.

On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing that
 executors are being lost as well. Thing is, I can't figure out how they are
 dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
 allocated for the application. I was thinking perhaps it was possible that
 a single executor was getting a single or a couple large partitions but
 shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

  TIA,
 Anders






Re: Missing shuffle files

2015-02-21 Thread Corey Nolet
I'm experiencing the same issue. Upon closer inspection I'm noticing that
executors are being lost as well. Thing is, I can't figure out how they are
dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
allocated for the application. I was thinking perhaps it was possible that
a single executor was getting a single or a couple large partitions but
shouldn't the disk persistence kick in at that point?

On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

 User class threw exception: Job aborted due to stage failure: Task 450 in
 stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

 TIA,
 Anders




Re: Missing shuffle files

2015-02-21 Thread Petar Zecevic


Could you try to turn on the external shuffle service?

spark.shuffle.service.enable= true


On 21.2.2015. 17:50, Corey Nolet wrote:
I'm experiencing the same issue. Upon closer inspection I'm noticing 
that executors are being lost as well. Thing is, I can't figure out 
how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 
1.3TB of memory allocated for the application. I was thinking perhaps 
it was possible that a single executor was getting a single or a 
couple large partitions but shouldn't the disk persistence kick in at 
that point?


On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com 
mailto:arp...@spotify.com wrote:


For large jobs, the following error message is shown that seems to
indicate that shuffle files for some reason are missing. It's a
rather large job with many partitions. If the data size is
reduced, the problem disappears. I'm running a build from Spark
master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any
idea of how to resolve this problem?

User class threw exception: Job aborted due to stage failure: Task
450 in stage 450.1 failed 4 times, most recent failure: Lost task
450.3 in stage 450.1 (TID 167370,
lon4-hadoopslave-b77.lon4.spotify.net
http://lon4-hadoopslave-b77.lon4.spotify.net):
java.io.FileNotFoundException:

/disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
(No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

TIA,
Anders