[ https://issues.apache.org/jira/browse/SPARK-14485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15322893#comment-15322893 ]
Kay Ousterhout commented on SPARK-14485: ---------------------------------------- I don't think (a) is especially rare: that's the case anytime data is saved to HDFS, or a result is returned -- e.g., from SQL queries that aggregate a result at the driver, rather than creating a result table. My point for (c) was that it seems to only benefit a small fraction of cases: when both (1) the scheduler learned about the lost executor before learning about the successful task and (2) there weren't other previous tasks in the same stage that ran on the failed executor (in which case the other, previously completed tasks won't get re-run until there's a fetch failure in the next stage). I'm also hesitant to add this special logic where we sometimes just ignore task-completed messages, because I'm worried about corner cases where this could lead to a job hanging because somehow the task never gets completed successfully. Given all of the above, I'd advocate reverting this, and submitted a PR to do so: https://github.com/apache/spark/pull/13580 > Task finished cause fetch failure when its executor has already been removed > by driver > --------------------------------------------------------------------------------------- > > Key: SPARK-14485 > URL: https://issues.apache.org/jira/browse/SPARK-14485 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.3.1, 1.5.2 > Reporter: iward > Assignee: iward > Fix For: 2.0.0 > > > Now, when executor is removed by driver with heartbeats timeout, driver will > re-queue the task on this executor and send a kill command to cluster to kill > this executor. > But, in a situation, the running task of this executor is finished and return > result to driver before this executor killed by kill command sent by driver. > At this situation, driver will accept the task finished event and ignore > speculative task and re-queued task. But, as we know, this executor has > removed by driver, the result of this finished task can not save in driver > because the *BlockManagerId* has also removed from *BlockManagerMaster* by > driver. So, the result data of this stage is not complete, and then, it will > cause fetch failure. > For example, the following is the task log: > {noformat} > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 WARN HeartbeatReceiver: Removing > executor 322 with no recent heartbeats: 256015 ms exceeds timeout 250000 ms > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 ERROR YarnScheduler: Lost executor > 322 on BJHC-HERA-16168.hadoop.jd.local: Executor heartbeat timed out after > 256015 ms > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO TaskSetManager: Re-queueing > tasks for 322 from TaskSet 107.0 > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 WARN TaskSetManager: Lost task > 229.0 in stage 107.0 (TID 10384, BJHC-HERA-16168.hadoop.jd.local): > ExecutorLostFailure (executor 322 lost) > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO DAGScheduler: Executor lost: > 322 (epoch 11) > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO BlockManagerMasterEndpoint: > Trying to remove executor 322 from BlockManagerMaster. > 2015-12-31 04:38:50 INFO 15/12/31 04:38:50 INFO BlockManagerMaster: Removed > 322 successfully in removeExecutor > {noformat} > {noformat} > 2015-12-31 04:38:52 INFO 15/12/31 04:38:52 INFO TaskSetManager: Finished task > 229.0 in stage 107.0 (TID 10384) in 272315 ms on > BJHC-HERA-16168.hadoop.jd.local (579/700) > 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Ignoring > task-finished event for 229.1 in stage 107.0 because task 229 has already > completed successfully > {noformat} > {noformat} > 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO DAGScheduler: Submitting 3 > missing tasks from ShuffleMapStage 107 (MapPartitionsRDD[263] at > mapPartitions at Exchange.scala:137) > 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO YarnScheduler: Adding task > set 107.1 with 3 tasks > 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Starting task > 0.0 in stage 107.1 (TID 10863, BJHC-HERA-18043.hadoop.jd.local, > PROCESS_LOCAL, 3745 bytes) > 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Starting task > 1.0 in stage 107.1 (TID 10864, BJHC-HERA-9291.hadoop.jd.local, PROCESS_LOCAL, > 3745 bytes) > 2015-12-31 04:40:12 INFO 15/12/31 04:40:12 INFO TaskSetManager: Starting task > 2.0 in stage 107.1 (TID 10865, BJHC-HERA-16047.hadoop.jd.local, > PROCESS_LOCAL, 3745 bytes) > {noformat} > Driver will check the stage's result is not complete, and submit missing > task, but this time, the next stage has run because previous stage has finish > for its task is all finished although its result is not complete. > {noformat} > 2015-12-31 04:40:13 INFO 15/12/31 04:40:13 WARN TaskSetManager: Lost task > 39.0 in stage 109.0 (TID 10905, BJHC-HERA-9357.hadoop.jd.local): > FetchFailed(null, shuffleId=11, mapId=-1, reduceId=39, message= > 2015-12-31 04:40:13 INFO > org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output > location for shuffle 11 > 2015-12-31 04:40:13 INFO at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) > 2015-12-31 04:40:13 INFO at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) > 2015-12-31 04:40:13 INFO at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > 2015-12-31 04:40:13 INFO at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > 2015-12-31 04:40:13 INFO at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > 2015-12-31 04:40:13 INFO at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > 2015-12-31 04:40:13 INFO at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > 2015-12-31 04:40:13 INFO at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > 2015-12-31 04:40:13 INFO at > org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) > 2015-12-31 04:40:13 INFO at > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:172) > 2015-12-31 04:40:13 INFO at > org.apache.spark.shuffle.sort.SortShuffleReader.computeShuffleBlocks(SortShuffleReader.scala:301) > 2015-12-31 04:40:13 INFO at > org.apache.spark.shuffle.sort.SortShuffleReader.read(SortShuffleReader.scala:111) > 2015-12-31 04:40:13 INFO at > org.apache.spark.shuffle.sort.MixedShuffleReader.read(MixedShuffleReader.scala:41) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > 2015-12-31 04:40:13 INFO at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > 2015-12-31 04:40:13 INFO at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > 2015-12-31 04:40:13 INFO at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > 2015-12-31 04:40:13 INFO at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > 2015-12-31 04:40:13 INFO at org.apache.spark.scheduler.Task.run(Task.scala:64) > 2015-12-31 04:40:13 INFO at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) > 2015-12-31 04:40:13 INFO at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > 2015-12-31 04:40:13 INFO at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > 2015-12-31 04:40:13 INFO at java.lang.Thread.run(Thread.java:745) > 2015-12-31 04:40:13 INFO > {noformat} > As the task log show, in this situation, it will casue FetchFailedException. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org