[ 
https://issues.apache.org/jira/browse/SPARK-21563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ash updated SPARK-21563:
-------------------------------
    Description: 
cc [~robert3005]

I was seeing this exception during some running Spark jobs:

{noformat}
16:16:28.294 [dispatcher-event-loop-14] ERROR org.apache.spark.rpc.netty.Inbox 
- Ignoring error
java.io.EOFException: null
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at 
org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127)
    at 
org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126)
    at scala.collection.immutable.Range.foreach(Range.scala:160)
    at 
org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126)
    at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95)
    at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
{noformat}

After some debugging, we determined that this is due to a race condition in 
task serde.  cc [~irashid] [~kayousterhout] who last touched that code in 
SPARK-19796

The race is between adding additional jars to the SparkContext and serializing 
the TaskDescription.

Consider this sequence of events:

- TaskSetManager creates a TaskDescription using a reference to the 
SparkContext's jars: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L506
- TaskDescription starts serializing, and begins writing jars: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L84
- the size of the jar map is written out: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L63
- _on another thread_: the application adds a jar to the SparkContext's jars 
list
- then the entries in the jars list are serialized out: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L64

The problem now is that the jars list is serialized as having N entries, but 
actually N+1 entries follow that count!

This causes task deserialization to fail in the executor, with the stacktrace 
above.

The same issue also likely exists for files, though I haven't observed that and 
our application does not stress that codepath the same way it did for jar 
additions.

One fix here is that TaskSetManager could make an immutable copy of the jars 
list that it passes into the TaskDescription constructor, so that list doesn't 
change mid-serialization.

  was:
cc [~robert3005]

I was seeing this exception during some running Spark jobs:

{noformat}
16:16:28.294 [dispatcher-event-loop-14] ERROR org.apache.spark.rpc.netty.Inbox 
- Ignoring error
java.io.EOFException: null
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at 
org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127)
    at 
org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126)
    at scala.collection.immutable.Range.foreach(Range.scala:160)
    at 
org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126)
    at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95)
    at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
{noformat}

After some debugging, we determined that this is due to a race condition in 
task serde introduced in SPARK-19796.  cc [~irashid] [~kayousterhout]

The race is between adding additional jars to the SparkContext and serializing 
the TaskDescription.

Consider this sequence of events:

- TaskSetManager creates a TaskDescription using a reference to the 
SparkContext's jars: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L506
- TaskDescription starts serializing, and begins writing jars: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L84
- the size of the jar map is written out: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L63
- _on another thread_: the application adds a jar to the SparkContext's jars 
list
- then the entries in the jars list are serialized out: 
https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L64

The problem now is that the jars list is serialized as having N entries, but 
actually N+1 entries follow that count!

This causes task deserialization to fail in the executor, with the stacktrace 
above.

The same issue also likely exists for files, though I haven't observed that and 
our application does not stress that codepath the same way it did for jar 
additions.

One fix here is that TaskSetManager could make an immutable copy of the jars 
list that it passes into the TaskDescription constructor, so that list doesn't 
change mid-serialization.


> Race condition when serializing TaskDescriptions and adding jars
> ----------------------------------------------------------------
>
>                 Key: SPARK-21563
>                 URL: https://issues.apache.org/jira/browse/SPARK-21563
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Andrew Ash
>
> cc [~robert3005]
> I was seeing this exception during some running Spark jobs:
> {noformat}
> 16:16:28.294 [dispatcher-event-loop-14] ERROR 
> org.apache.spark.rpc.netty.Inbox - Ignoring error
> java.io.EOFException: null
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readUTF(DataInputStream.java:609)
>     at java.io.DataInputStream.readUTF(DataInputStream.java:564)
>     at 
> org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127)
>     at 
> org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126)
>     at scala.collection.immutable.Range.foreach(Range.scala:160)
>     at 
> org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126)
>     at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95)
>     at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
>     at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> After some debugging, we determined that this is due to a race condition in 
> task serde.  cc [~irashid] [~kayousterhout] who last touched that code in 
> SPARK-19796
> The race is between adding additional jars to the SparkContext and 
> serializing the TaskDescription.
> Consider this sequence of events:
> - TaskSetManager creates a TaskDescription using a reference to the 
> SparkContext's jars: 
> https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L506
> - TaskDescription starts serializing, and begins writing jars: 
> https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L84
> - the size of the jar map is written out: 
> https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L63
> - _on another thread_: the application adds a jar to the SparkContext's jars 
> list
> - then the entries in the jars list are serialized out: 
> https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala#L64
> The problem now is that the jars list is serialized as having N entries, but 
> actually N+1 entries follow that count!
> This causes task deserialization to fail in the executor, with the stacktrace 
> above.
> The same issue also likely exists for files, though I haven't observed that 
> and our application does not stress that codepath the same way it did for jar 
> additions.
> One fix here is that TaskSetManager could make an immutable copy of the jars 
> list that it passes into the TaskDescription constructor, so that list 
> doesn't change mid-serialization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to