Repository: spark Updated Branches: refs/heads/branch-2.3 dbf0b9340 -> 31e46ec60
[SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. ## What changes were proposed in this pull request? The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. ## How was this patch tested? Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png"> **dispatcher-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png"> Closes #22221 from pgandhi999/SPARK-25231. Authored-by: pgandhi <pgan...@oath.com> Signed-off-by: Thomas Graves <tgra...@apache.org> (cherry picked from commit 559b899aceb160fcec3a57109c0b60a0ae40daeb) Signed-off-by: Thomas Graves <tgra...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31e46ec6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31e46ec6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31e46ec6 Branch: refs/heads/branch-2.3 Commit: 31e46ec60849d315a4e83e0a332606a4405907ad Parents: dbf0b93 Author: pgandhi <pgan...@oath.com> Authored: Wed Sep 5 16:10:49 2018 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Wed Sep 5 16:11:08 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++++------ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/SchedulerIntegrationSuite.scala | 3 ++- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 56c0bf6..4edc6b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.{Locale, Timer, TimerTask} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set @@ -90,7 +90,7 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` - private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] + private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToTaskSetManager(tid) = taskSet + taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK @@ -392,7 +392,7 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { - taskIdToTaskSetManager.get(tid) match { + Option(taskIdToTaskSetManager.get(tid)) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, @@ -444,10 +444,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) - val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { + val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) - taskIdToTaskSetManager.get(id).map { taskSetMgr => + Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } } http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5627a55..8848add 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -289,7 +289,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) if (serializedTask.limit() >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => + Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 75ea409..69c3f24 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -398,7 +398,8 @@ private[spark] abstract class MockBackend( // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual // tests from introducing a race if they need it. val newTasks = newTaskDescriptions.map { taskDescription => - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val taskSet = + Option(taskScheduler.taskIdToTaskSetManager.get(taskDescription.taskId).taskSet).get val task = taskSet.tasks(taskDescription.index) (taskDescription, task) } http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a9e0aed..932bfca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -247,7 +247,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.submitTasks(attempt2) val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten assert(1 === taskDescriptions3.length) - val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId)).get assert(mgr.taskSet.stageAttemptId === 1) assert(!failedTaskSet) } @@ -285,7 +285,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(10 === taskDescriptions3.length) taskDescriptions3.foreach { task => - val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(task.taskId)).get assert(mgr.taskSet.stageAttemptId === 1) } assert(!failedTaskSet) @@ -723,7 +723,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // only schedule one task because of locality assert(taskDescs.size === 1) - val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId)).get assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY)) // we should know about both executors, even though we only scheduled tasks on one of them assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0"))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org