Repository: spark Updated Branches: refs/heads/master 925449283 -> 559b899ac
[SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. ## What changes were proposed in this pull request? The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. ## How was this patch tested? Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png"> **dispatcher-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png"> Closes #22221 from pgandhi999/SPARK-25231. Authored-by: pgandhi <pgan...@oath.com> Signed-off-by: Thomas Graves <tgra...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/559b899a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/559b899a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/559b899a Branch: refs/heads/master Commit: 559b899aceb160fcec3a57109c0b60a0ae40daeb Parents: 9254492 Author: pgandhi <pgan...@oath.com> Authored: Wed Sep 5 16:10:49 2018 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Wed Sep 5 16:10:49 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++++------ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/SchedulerIntegrationSuite.scala | 3 ++- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8992d7e..8b71170 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.{Locale, Timer, TimerTask} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set @@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` - private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] + private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -315,7 +315,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToTaskSetManager(tid) = taskSet + taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { - taskIdToTaskSetManager.get(tid) match { + Option(taskIdToTaskSetManager.get(tid)) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) - val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { + val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) - taskIdToTaskSetManager.get(id).map { taskSetMgr => + Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } } http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 747e8c7..de7c0d8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -290,7 +290,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) if (serializedTask.limit() >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => + Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index cea7f17..2d409d9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -400,7 +400,8 @@ private[spark] abstract class MockBackend( // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual // tests from introducing a race if they need it. val newTasks = newTaskDescriptions.map { taskDescription => - val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val taskSet = + Option(taskScheduler.taskIdToTaskSetManager.get(taskDescription.taskId).taskSet).get val task = taskSet.tasks(taskDescription.index) (taskDescription, task) } http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7a457a0..9e1d13e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -248,7 +248,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.submitTasks(attempt2) val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten assert(1 === taskDescriptions3.length) - val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId)).get assert(mgr.taskSet.stageAttemptId === 1) assert(!failedTaskSet) } @@ -286,7 +286,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(10 === taskDescriptions3.length) taskDescriptions3.foreach { task => - val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(task.taskId)).get assert(mgr.taskSet.stageAttemptId === 1) } assert(!failedTaskSet) @@ -724,7 +724,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // only schedule one task because of locality assert(taskDescs.size === 1) - val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get + val mgr = Option(taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId)).get assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY)) // we should know about both executors, even though we only scheduled tasks on one of them assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0"))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org