spark git commit: [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl
Repository: spark Updated Branches: refs/heads/master 925449283 -> 559b899ac [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. ## What changes were proposed in this pull request? The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. ## How was this patch tested? Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** https://user-images.githubusercontent.com/8190/44593413-e25df780-a788-11e8-9520-176a18401a59.png";> **dispatcher-event-loop-thread:** https://user-images.githubusercontent.com/8190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png";> Closes #1 from pgandhi999/SPARK-25231. Authored-by: pgandhi Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/559b899a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/559b899a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/559b899a Branch: refs/heads/master Commit: 559b899aceb160fcec3a57109c0b60a0ae40daeb Parents: 9254492 Author: pgandhi Authored: Wed Sep 5 16:10:49 2018 -0500 Committer: Thomas Graves Committed: Wed Sep 5 16:10:49 2018 -0500 -- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/SchedulerIntegrationSuite.scala | 3 ++- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8992d7e..8b71170 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.{Locale, Timer, TimerTask} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set @@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` - private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] + private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -315,7 +315,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId -taskIdToTaskSetManager(tid) = taskSet +taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK @@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { -taskIdToTaskSetManager.get(tid) match { +Option(taskIdToTaskSetManager.get(tid)) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_,
spark git commit: [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl
Repository: spark Updated Branches: refs/heads/branch-2.3 dbf0b9340 -> 31e46ec60 [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. ## What changes were proposed in this pull request? The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. ## How was this patch tested? Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** https://user-images.githubusercontent.com/8190/44593413-e25df780-a788-11e8-9520-176a18401a59.png";> **dispatcher-event-loop-thread:** https://user-images.githubusercontent.com/8190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png";> Closes #1 from pgandhi999/SPARK-25231. Authored-by: pgandhi Signed-off-by: Thomas Graves (cherry picked from commit 559b899aceb160fcec3a57109c0b60a0ae40daeb) Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31e46ec6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31e46ec6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31e46ec6 Branch: refs/heads/branch-2.3 Commit: 31e46ec60849d315a4e83e0a332606a4405907ad Parents: dbf0b93 Author: pgandhi Authored: Wed Sep 5 16:10:49 2018 -0500 Committer: Thomas Graves Committed: Wed Sep 5 16:11:08 2018 -0500 -- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/SchedulerIntegrationSuite.scala | 3 ++- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 56c0bf6..4edc6b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.{Locale, Timer, TimerTask} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set @@ -90,7 +90,7 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` - private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] + private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId -taskIdToTaskSetManager(tid) = taskSet +taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK @@ -392,7 +392,7 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { -taskIdToTaskSetManager.get(tid) match { +Option(taskIdToTaskSetManager.get(tid)) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, @@ -444,10