[ https://issues.apache.org/jira/browse/SPARK-21564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16122651#comment-16122651 ]
Andrew Ash commented on SPARK-21564: ------------------------------------ [~irashid] a possible fix could look roughly like this: {noformat} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a2f1aa22b0..06d72fe106 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.executor +import java.io.{DataInputStream, NotSerializableException} import java.net.URL import java.nio.ByteBuffer import java.util.Locale @@ -35,7 +36,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{ByteBufferInputStream, ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -93,9 +94,26 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { - val taskDesc = TaskDescription.decode(data.value) - logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc) + try { + val taskDesc = TaskDescription.decode(data.value) + logInfo("Got assigned task " + taskDesc.taskId) + executor.launchTask(this, taskDesc) + } catch { + case e: Exception => + val taskId = new DataInputStream(new ByteBufferInputStream( + ByteBuffer.wrap(data.value.array()))).readLong() + val ser = env.closureSerializer.newInstance() + val serializedTaskEndReason = { + try { + ser.serialize(new ExceptionFailure(e, Nil)) + } catch { + case _: NotSerializableException => + // e is not serializable so just send the stacktrace + ser.serialize(new ExceptionFailure(e, Nil, false)) + } + } + statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) + } } case KillTask(taskId, _, interruptThread, reason) => {noformat} The downside here though is that we're still making the assumption that the TaskDescription is well-formatted enough to be able to get the taskId out of it (the first long in the serialized bytes). Any other thoughts on how to do this? > TaskDescription decoding failure should fail the task > ----------------------------------------------------- > > Key: SPARK-21564 > URL: https://issues.apache.org/jira/browse/SPARK-21564 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0 > Reporter: Andrew Ash > > cc [~robert3005] > I was seeing an issue where Spark was throwing this exception: > {noformat} > 16:16:28.294 [dispatcher-event-loop-14] ERROR > org.apache.spark.rpc.netty.Inbox - Ignoring error > java.io.EOFException: null > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readUTF(DataInputStream.java:609) > at java.io.DataInputStream.readUTF(DataInputStream.java:564) > at > org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127) > at > org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {noformat} > For details on the cause of that exception, see SPARK-21563 > We've since changed the application and have a proposed fix in Spark at the > ticket above, but it was troubling that decoding the TaskDescription wasn't > failing the tasks. So the Spark job ended up hanging and making no progress, > permanently stuck, because the driver thinks the task is running but the > thread has died in the executor. > We should make a change around > https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L96 > so that when that decode throws an exception, the task is marked as failed. > cc [~kayousterhout] [~irashid] -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org