Repository: spark Updated Branches: refs/heads/branch-1.5 8ecfb05e3 -> 980687206
[SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler Because `JobScheduler.stop(false)` may set `eventLoop` to null when `JobHandler` is running, then it's possible that when `post` is called, `eventLoop` happens to null. This PR fixed this bug and also set threads in `jobExecutor` to `daemon`. Author: zsxwing <zsxw...@gmail.com> Closes #7960 from zsxwing/fix-npe and squashes the following commits: b0864c4 [zsxwing] Fix a potential NPE in Streaming JobScheduler (cherry picked from commit 346209097e88fe79015359e40b49c32cc0bdc439) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98068720 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98068720 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98068720 Branch: refs/heads/branch-1.5 Commit: 9806872065b97df524bd631467105219b37f79f3 Parents: 8ecfb05 Author: zsxwing <zsxw...@gmail.com> Authored: Thu Aug 6 14:39:36 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Aug 6 14:39:48 2015 -0700 ---------------------------------------------------------------------- .../streaming/scheduler/JobScheduler.scala | 32 ++++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/98068720/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 7e73556..6d4cdc4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.scheduler -import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConversions._ import scala.util.{Failure, Success} @@ -25,7 +25,7 @@ import scala.util.{Failure, Success} import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ -import org.apache.spark.util.EventLoop +import org.apache.spark.util.{EventLoop, ThreadUtils} private[scheduler] sealed trait JobSchedulerEvent @@ -44,7 +44,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) - private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) + private val jobExecutor = + ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() @@ -193,14 +194,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) try { - eventLoop.post(JobStarted(job)) - // Disable checks for existing output directories in jobs launched by the streaming - // scheduler, since we may need to write output to an existing directory during checkpoint - // recovery; see SPARK-4835 for more details. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { - job.run() + // We need to assign `eventLoop` to a temp variable. Otherwise, because + // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then + // it's possible that when `post` is called, `eventLoop` happens to null. + var _eventLoop = eventLoop + if (_eventLoop != null) { + _eventLoop.post(JobStarted(job)) + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + job.run() + } + _eventLoop = eventLoop + if (_eventLoop != null) { + _eventLoop.post(JobCompleted(job)) + } + } else { + // JobScheduler has been stopped. } - eventLoop.post(JobCompleted(job)) } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org