Repository: spark
Updated Branches:
  refs/heads/branch-2.3 dbf0b9340 -> 31e46ec60


[SPARK-25231] Fix synchronization of executor heartbeat receiver in 
TaskSchedulerImpl

Running a large Spark job with speculation turned on was causing executor 
heartbeats to time out on the driver end after sometime and eventually, after 
hitting the max number of executor failures, the job would fail.

## What changes were proposed in this pull request?

The main reason for the heartbeat timeouts was that the 
heartbeat-receiver-event-loop-thread was blocked waiting on the 
TaskSchedulerImpl object which was being held by one of the 
dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in 
TaskSetManager.scala. On further analysis of the heartbeat receiver method 
executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that 
instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can 
remove that lock and make the operations to the global variables inside the 
code block to be atomic. The block of code in that method only uses  one global 
HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are 
ensuring atomicity of operations and speeding up the heartbeat receiver thread 
operation.

## How was this patch tested?

Screenshots of the thread dump have been attached below:
**heartbeat-receiver-event-loop-thread:**

<img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" 
src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png";>

**dispatcher-event-loop-thread:**

<img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" 
src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png";>

Closes #22221 from pgandhi999/SPARK-25231.

Authored-by: pgandhi <pgan...@oath.com>
Signed-off-by: Thomas Graves <tgra...@apache.org>
(cherry picked from commit 559b899aceb160fcec3a57109c0b60a0ae40daeb)
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31e46ec6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31e46ec6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31e46ec6

Branch: refs/heads/branch-2.3
Commit: 31e46ec60849d315a4e83e0a332606a4405907ad
Parents: dbf0b93
Author: pgandhi <pgan...@oath.com>
Authored: Wed Sep 5 16:10:49 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Wed Sep 5 16:11:08 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 12 ++++++------
 .../cluster/CoarseGrainedSchedulerBackend.scala         |  2 +-
 .../spark/scheduler/SchedulerIntegrationSuite.scala     |  3 ++-
 .../apache/spark/scheduler/TaskSchedulerImplSuite.scala |  6 +++---
 4 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 56c0bf6..4edc6b2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.{Locale, Timer, TimerTask}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.Set
@@ -90,7 +90,7 @@ private[spark] class TaskSchedulerImpl(
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
 
   // Protected by `this`
-  private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, 
TaskSetManager]
+  private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, 
TaskSetManager]
   val taskIdToExecutorId = new HashMap[Long, String]
 
   @volatile private var hasReceivedTask = false
@@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl(
           for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
             tasks(i) += task
             val tid = task.taskId
-            taskIdToTaskSetManager(tid) = taskSet
+            taskIdToTaskSetManager.put(tid, taskSet)
             taskIdToExecutorId(tid) = execId
             executorIdToRunningTaskIds(execId).add(tid)
             availableCpus(i) -= CPUS_PER_TASK
@@ -392,7 +392,7 @@ private[spark] class TaskSchedulerImpl(
     var reason: Option[ExecutorLossReason] = None
     synchronized {
       try {
-        taskIdToTaskSetManager.get(tid) match {
+        Option(taskIdToTaskSetManager.get(tid)) match {
           case Some(taskSet) =>
             if (state == TaskState.LOST) {
               // TaskState.LOST is only used by the deprecated Mesos 
fine-grained scheduling mode,
@@ -444,10 +444,10 @@ private[spark] class TaskSchedulerImpl(
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
       blockManagerId: BlockManagerId): Boolean = {
     // (taskId, stageId, stageAttemptId, accumUpdates)
-    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] 
= synchronized {
+    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] 
= {
       accumUpdates.flatMap { case (id, updates) =>
         val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
-        taskIdToTaskSetManager.get(id).map { taskSetMgr =>
+        Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
           (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5627a55..8848add 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -289,7 +289,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       for (task <- tasks.flatten) {
         val serializedTask = TaskDescription.encode(task)
         if (serializedTask.limit() >= maxRpcMessageSize) {
-          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
+          Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { 
taskSetMgr =>
             try {
               var msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
                 "spark.rpc.message.maxSize (%d bytes). Consider increasing " +

http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 75ea409..69c3f24 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -398,7 +398,8 @@ private[spark] abstract class MockBackend(
       // get the task now, since that requires a lock on TaskSchedulerImpl, to 
prevent individual
       // tests from introducing a race if they need it.
       val newTasks = newTaskDescriptions.map { taskDescription =>
-        val taskSet = 
taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+        val taskSet =
+          
Option(taskScheduler.taskIdToTaskSetManager.get(taskDescription.taskId).taskSet).get
         val task = taskSet.tasks(taskDescription.index)
         (taskDescription, task)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a9e0aed..932bfca 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -247,7 +247,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     taskScheduler.submitTasks(attempt2)
     val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
     assert(1 === taskDescriptions3.length)
-    val mgr = 
taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
+    val mgr = 
Option(taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId)).get
     assert(mgr.taskSet.stageAttemptId === 1)
     assert(!failedTaskSet)
   }
@@ -285,7 +285,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(10 === taskDescriptions3.length)
 
     taskDescriptions3.foreach { task =>
-      val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
+      val mgr = 
Option(taskScheduler.taskIdToTaskSetManager.get(task.taskId)).get
       assert(mgr.taskSet.stageAttemptId === 1)
     }
     assert(!failedTaskSet)
@@ -723,7 +723,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     // only schedule one task because of locality
     assert(taskDescs.size === 1)
 
-    val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get
+    val mgr = 
Option(taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId)).get
     assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, 
TaskLocality.ANY))
     // we should know about both executors, even though we only scheduled 
tasks on one of them
     assert(taskScheduler.getExecutorsAliveOnHost("host0") === 
Some(Set("executor0")))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to