[ 
https://issues.apache.org/jira/browse/SPARK-3005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14108677#comment-14108677
 ] 

Xu Zhongxing edited comment on SPARK-3005 at 8/25/14 2:33 AM:
--------------------------------------------------------------

[SPARK-1749] didn't fix this problem. It just catches the 
UnsupportedOperationException and logs it. Then it sets ableToCancelStages = 
false. This is exactly the reason that causes the hang. Because the code only 
does cleanup when ableToCancelStages = true.
{code}
+ if (ableToCancelStages) {
+   job.listener.jobFailed(error)
+   cleanupStateForJobAndIndependentStages(job, resultStage)
+   listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
{code}
The fact is that in the mesos fine-grained case, it is unnecessary to 
killTask(). So throwing UnsupportedOperationException and set 
ableToCancelStages = false is wrong behaviour for this case. We just need to do 
nothing in killTask() and let the driver do the rest of the cleanup.

The problem here is in the MesosSchedulerBackend. The MesosSchedulerBackend 
does not need to kill tasks, and should not throw 
UnsupportedOperationException. The tasks themselves already died and exited.



was (Author: xuzhongxing):
[SPARK-1749] didn't fix this problem. It just catches the 
UnsupportedOperationException and logs it. Then it sets ableToCancelStages = 
false. This is exactly the reason that causes the hang. Because the code only 
does cleanup when ableToCancelStages = true.
{{code}}
+ if (ableToCancelStages) {
+   job.listener.jobFailed(error)
+   cleanupStateForJobAndIndependentStages(job, resultStage)
+   listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
{{code}}
The fact is that in the mesos fine-grained case, it is unnecessary to 
killTask(). So throwing UnsupportedOperationException and set 
ableToCancelStages = false is wrong behaviour for this case. We just need to do 
nothing in killTask() and let the driver do the rest of the cleanup.

The problem here is in the MesosSchedulerBackend. The MesosSchedulerBackend 
does not need to kill tasks, and should not throw 
UnsupportedOperationException. The tasks themselves already died and exited.


> Spark with Mesos fine-grained mode throws UnsupportedOperationException in 
> MesosSchedulerBackend.killTask()
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-3005
>                 URL: https://issues.apache.org/jira/browse/SPARK-3005
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.2
>         Environment: Spark 1.0.2, Mesos 0.18.1, spark-cassandra-connector
>            Reporter: Xu Zhongxing
>         Attachments: SPARK-3005_1.diff
>
>
> I am using Spark, Mesos, spark-cassandra-connector to do some work on a 
> cassandra cluster.
> During the job running, I killed the Cassandra daemon to simulate some 
> failure cases. This results in task failures.
> If I run the job in Mesos coarse-grained mode, the spark driver program 
> throws an exception and shutdown cleanly.
> But when I run the job in Mesos fine-grained mode, the spark driver program 
> hangs.
> The spark log is: 
> {code}
>  INFO [spark-akka.actor.default-dispatcher-4] 2014-08-13 15:58:15,794 
> Logging.scala (line 58) Cancelling stage 1
>  INFO [spark-akka.actor.default-dispatcher-4] 2014-08-13 15:58:15,797 
> Logging.scala (line 79) Could not cancel tasks for stage 1
> java.lang.UnsupportedOperationException
>       at 
> org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32)
>       at 
> org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183)
>       at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:183)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:176)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:176)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1075)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1061)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1061)
>       at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1061)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to