Repository: spark
Updated Branches:
  refs/heads/branch-1.5 8ecfb05e3 -> 980687206


[SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler

Because `JobScheduler.stop(false)` may set `eventLoop` to null when 
`JobHandler` is running, then it's possible that when `post` is called, 
`eventLoop` happens to null.

This PR fixed this bug and also set threads in `jobExecutor` to `daemon`.

Author: zsxwing <zsxw...@gmail.com>

Closes #7960 from zsxwing/fix-npe and squashes the following commits:

b0864c4 [zsxwing] Fix a potential NPE in Streaming JobScheduler

(cherry picked from commit 346209097e88fe79015359e40b49c32cc0bdc439)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98068720
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98068720
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98068720

Branch: refs/heads/branch-1.5
Commit: 9806872065b97df524bd631467105219b37f79f3
Parents: 8ecfb05
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Aug 6 14:39:36 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Aug 6 14:39:48 2015 -0700

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      | 32 ++++++++++++++------
 1 file changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/98068720/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 7e73556..6d4cdc4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success}
@@ -25,7 +25,7 @@ import scala.util.{Failure, Success}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming._
-import org.apache.spark.util.EventLoop
+import org.apache.spark.util.{EventLoop, ThreadUtils}
 
 
 private[scheduler] sealed trait JobSchedulerEvent
@@ -44,7 +44,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
   // https://gist.github.com/AlainODea/1375759b8720a3f9f094
   private val jobSets: java.util.Map[Time, JobSet] = new 
ConcurrentHashMap[Time, JobSet]
   private val numConcurrentJobs = 
ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
-  private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs)
+  private val jobExecutor =
+    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, 
"streaming-job-executor")
   private val jobGenerator = new JobGenerator(this)
   val clock = jobGenerator.clock
   val listenerBus = new StreamingListenerBus()
@@ -193,14 +194,25 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
       ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, 
job.time.milliseconds.toString)
       ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, 
job.outputOpId.toString)
       try {
-        eventLoop.post(JobStarted(job))
-        // Disable checks for existing output directories in jobs launched by 
the streaming
-        // scheduler, since we may need to write output to an existing 
directory during checkpoint
-        // recovery; see SPARK-4835 for more details.
-        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
-          job.run()
+        // We need to assign `eventLoop` to a temp variable. Otherwise, because
+        // `JobScheduler.stop(false)` may set `eventLoop` to null when this 
method is running, then
+        // it's possible that when `post` is called, `eventLoop` happens to 
null.
+        var _eventLoop = eventLoop
+        if (_eventLoop != null) {
+          _eventLoop.post(JobStarted(job))
+          // Disable checks for existing output directories in jobs launched 
by the streaming
+          // scheduler, since we may need to write output to an existing 
directory during checkpoint
+          // recovery; see SPARK-4835 for more details.
+          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+            job.run()
+          }
+          _eventLoop = eventLoop
+          if (_eventLoop != null) {
+            _eventLoop.post(JobCompleted(job))
+          }
+        } else {
+          // JobScheduler has been stopped.
         }
-        eventLoop.post(JobCompleted(job))
       } finally {
         ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
         ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to