Re: Spark Application Hung

2015-03-25 Thread Akhil Das
In production, i'd suggest you having a High availability cluster with
minimum of 3 nodes (data nodes in your case).

Now lets examine your scenario:

- When you suddenly brings down one of the node which has 2 executors
running on it, what happens is that the node (DN2) will be having your jobs
shuffle data or computed data stored in it for the next stages (this is the
same effect as deleting your spark's local/work dir from DN1). The absence
of this node will lead to fetchFailures as you are seeing in the logs. But
eventually it will end up trying for sometime and i believe it will
recompute your whole pipeline on DN1



Thanks
Best Regards

On Wed, Mar 25, 2015 at 12:11 AM, Ashish Rawat ashish.ra...@guavus.com
wrote:

  Hi,

  We are observing a hung spark application when one of the yarn datanode
 (running multiple spark executors) go down.

  *Setup details*:

- Spark: 1.2.1
- Hadoop: 2.4.0
- Spark Application Mode: yarn-client
- 2 datanodes (DN1, DN2)
- 6 spark executors (initially 3 executors on both DN1 and DN2, after
rebooting DN2, changes to 4 executors on DN1 and 2 executors on DN2)

 *Scenario*:

  When one of the datanodes (DN2) is brought down, the application gets
 hung, with spark driver continuously showing the following warning:

  15/03/24 12:39:26 WARN TaskSetManager: Lost task 5.0 in stage 232.0 (TID
 37941, DN1): FetchFailed(null, shuffleId=155, mapId=-1, reduceId=5, message=
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 155
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
 at
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
 at
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
 at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
 Source)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
 Source)
 at java.lang.Thread.run(Unknown Source)


  When DN2 is brought down, one executor gets launched on DN1. When DN2 is
 brought back up after 15mins, 2 executors get launched on it.
 All the executors (including the ones which got launched after DN2 comes
 back), keep showing the following errors:

  15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map
 outputs for shuffle 155, fetching them
 

Spark Application Hung

2015-03-24 Thread Ashish Rawat
Hi,

We are observing a hung spark application when one of the yarn datanode 
(running multiple spark executors) go down.

Setup details:

  *   Spark: 1.2.1
  *   Hadoop: 2.4.0
  *   Spark Application Mode: yarn-client
  *   2 datanodes (DN1, DN2)
  *   6 spark executors (initially 3 executors on both DN1 and DN2, after 
rebooting DN2, changes to 4 executors on DN1 and 2 executors on DN2)

Scenario:

When one of the datanodes (DN2) is brought down, the application gets hung, 
with spark driver continuously showing the following warning:

15/03/24 12:39:26 WARN TaskSetManager: Lost task 5.0 in stage 232.0 (TID 37941, 
DN1): FetchFailed(null, shuffleId=155, mapId=-1, reduceId=5, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 155
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)


When DN2 is brought down, one executor gets launched on DN1. When DN2 is 
brought back up after 15mins, 2 executors get launched on it.
All the executors (including the ones which got launched after DN2 comes back), 
keep showing the following errors:

15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map outputs for 
shuffle 155, fetching them
15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map outputs for 
shuffle 155, fetching them
15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker 
actor = Actor[akka.tcp://sparkDriver@NN1:44353/user/MapOutputTracker#-957394722]
15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Got the output locations
15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location for 
shuffle 155
15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location for 
shuffle 155
15/03/24 12:43:30 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
44623
15/03/24 12:43:30 INFO executor.Executor: Running task 5.0 in stage 232.960 
(TID 44623)
15/03/24 12:43:30 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
44629