Repository: spark
Updated Branches:
  refs/heads/master 1f29d502e -> affc8a887


[SPARK-10125] [STREAMING] Fix a potential deadlock in JobGenerator.stop

Because `lazy val` uses `this` lock, if JobGenerator.stop and 
JobGenerator.doCheckpoint (JobGenerator.shouldCheckpoint has not yet been 
initialized) run at the same time, it may hang.

Here are the stack traces for the deadlock:

```Java
"pool-1-thread-1-ScalaTest-running-StreamingListenerSuite" #11 prio=5 
os_prio=31 tid=0x00007fd35d094800 nid=0x5703 in Object.wait() 
[0x000000012ecaf000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1245)
        - locked <0x00000007b5d8d7f8> (a 
org.apache.spark.util.EventLoop$$anon$1)
        at java.lang.Thread.join(Thread.java:1319)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:155)
        - locked <0x00000007b5d8cea0> (a 
org.apache.spark.streaming.scheduler.JobGenerator)
        at 
org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:95)
        - locked <0x00000007b5d8ced8> (a 
org.apache.spark.streaming.scheduler.JobScheduler)
        at 
org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:687)

"JobGenerator" #67 daemon prio=5 os_prio=31 tid=0x00007fd35c3b9800 nid=0x9f03 
waiting for monitor entry [0x0000000139e4a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.shouldCheckpoint$lzycompute(JobGenerator.scala:63)
        - waiting to lock <0x00000007b5d8cea0> (a 
org.apache.spark.streaming.scheduler.JobGenerator)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.shouldCheckpoint(JobGenerator.scala:63)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:290)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
```

I can use this patch to produce this deadlock: 
https://github.com/zsxwing/spark/commit/8a88f28d1331003a65fabef48ae3d22a7c21f05f

And a timeout build in Jenkins due to this deadlock: 
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1654/

This PR initializes `checkpointWriter` before `eventLoop` uses it to avoid this 
deadlock.

Author: zsxwing <zsxw...@gmail.com>

Closes #8326 from zsxwing/SPARK-10125.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/affc8a88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/affc8a88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/affc8a88

Branch: refs/heads/master
Commit: affc8a887ede9fdc2ca6051833954cd10918c869
Parents: 1f29d50
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Aug 19 19:43:09 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Aug 19 19:43:09 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/scheduler/JobGenerator.scala      | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/affc8a88/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 9f2117a..2de035d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -79,6 +79,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
   def start(): Unit = synchronized {
     if (eventLoop != null) return // generator has already been started
 
+    // Call checkpointWriter here to initialize it before eventLoop uses it to 
avoid a deadlock.
+    // See SPARK-10125
+    checkpointWriter
+
     eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
       override protected def onReceive(event: JobGeneratorEvent): Unit = 
processEvent(event)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to