[ 
https://issues.apache.org/jira/browse/SPARK-21564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16122651#comment-16122651
 ] 

Andrew Ash commented on SPARK-21564:
------------------------------------

[~irashid] a possible fix could look roughly like this:

{noformat}
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a2f1aa22b0..06d72fe106 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,6 +17,7 @@

 package org.apache.spark.executor

+import java.io.{DataInputStream, NotSerializableException}
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
@@ -35,7 +36,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{ByteBufferInputStream, ThreadUtils, Utils}

 private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
@@ -93,9 +94,26 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
-        val taskDesc = TaskDescription.decode(data.value)
-        logInfo("Got assigned task " + taskDesc.taskId)
-        executor.launchTask(this, taskDesc)
+        try {
+          val taskDesc = TaskDescription.decode(data.value)
+          logInfo("Got assigned task " + taskDesc.taskId)
+          executor.launchTask(this, taskDesc)
+        } catch {
+          case e: Exception =>
+            val taskId = new DataInputStream(new ByteBufferInputStream(
+              ByteBuffer.wrap(data.value.array()))).readLong()
+            val ser = env.closureSerializer.newInstance()
+            val serializedTaskEndReason = {
+              try {
+                ser.serialize(new ExceptionFailure(e, Nil))
+              } catch {
+                case _: NotSerializableException =>
+                  // e is not serializable so just send the stacktrace
+                  ser.serialize(new ExceptionFailure(e, Nil, false))
+              }
+            }
+            statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+        }
       }

     case KillTask(taskId, _, interruptThread, reason) =>
{noformat}

The downside here though is that we're still making the assumption that the 
TaskDescription is well-formatted enough to be able to get the taskId out of it 
(the first long in the serialized bytes).

Any other thoughts on how to do this?

> TaskDescription decoding failure should fail the task
> -----------------------------------------------------
>
>                 Key: SPARK-21564
>                 URL: https://issues.apache.org/jira/browse/SPARK-21564
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Andrew Ash
>
> cc [~robert3005]
> I was seeing an issue where Spark was throwing this exception:
> {noformat}
> 16:16:28.294 [dispatcher-event-loop-14] ERROR 
> org.apache.spark.rpc.netty.Inbox - Ignoring error
> java.io.EOFException: null
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readUTF(DataInputStream.java:609)
>     at java.io.DataInputStream.readUTF(DataInputStream.java:564)
>     at 
> org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127)
>     at 
> org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126)
>     at scala.collection.immutable.Range.foreach(Range.scala:160)
>     at 
> org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126)
>     at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95)
>     at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
>     at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For details on the cause of that exception, see SPARK-21563
> We've since changed the application and have a proposed fix in Spark at the 
> ticket above, but it was troubling that decoding the TaskDescription wasn't 
> failing the tasks.  So the Spark job ended up hanging and making no progress, 
> permanently stuck, because the driver thinks the task is running but the 
> thread has died in the executor.
> We should make a change around 
> https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L96
>  so that when that decode throws an exception, the task is marked as failed.
> cc [~kayousterhout] [~irashid]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to