spark git commit: [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl

2018-09-05 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 925449283 -> 559b899ac


[SPARK-25231] Fix synchronization of executor heartbeat receiver in 
TaskSchedulerImpl

Running a large Spark job with speculation turned on was causing executor 
heartbeats to time out on the driver end after sometime and eventually, after 
hitting the max number of executor failures, the job would fail.

## What changes were proposed in this pull request?

The main reason for the heartbeat timeouts was that the 
heartbeat-receiver-event-loop-thread was blocked waiting on the 
TaskSchedulerImpl object which was being held by one of the 
dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in 
TaskSetManager.scala. On further analysis of the heartbeat receiver method 
executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that 
instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can 
remove that lock and make the operations to the global variables inside the 
code block to be atomic. The block of code in that method only uses  one global 
HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are 
ensuring atomicity of operations and speeding up the heartbeat receiver thread 
operation.

## How was this patch tested?

Screenshots of the thread dump have been attached below:
**heartbeat-receiver-event-loop-thread:**

https://user-images.githubusercontent.com/8190/44593413-e25df780-a788-11e8-9520-176a18401a59.png";>

**dispatcher-event-loop-thread:**

https://user-images.githubusercontent.com/8190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png";>

Closes #1 from pgandhi999/SPARK-25231.

Authored-by: pgandhi 
Signed-off-by: Thomas Graves 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/559b899a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/559b899a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/559b899a

Branch: refs/heads/master
Commit: 559b899aceb160fcec3a57109c0b60a0ae40daeb
Parents: 9254492
Author: pgandhi 
Authored: Wed Sep 5 16:10:49 2018 -0500
Committer: Thomas Graves 
Committed: Wed Sep 5 16:10:49 2018 -0500

--
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 12 ++--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  2 +-
 .../spark/scheduler/SchedulerIntegrationSuite.scala |  3 ++-
 .../apache/spark/scheduler/TaskSchedulerImplSuite.scala |  6 +++---
 4 files changed, 12 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/559b899a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8992d7e..8b71170 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.{Locale, Timer, TimerTask}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.Set
@@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl(
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
 
   // Protected by `this`
-  private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, 
TaskSetManager]
+  private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, 
TaskSetManager]
   val taskIdToExecutorId = new HashMap[Long, String]
 
   @volatile private var hasReceivedTask = false
@@ -315,7 +315,7 @@ private[spark] class TaskSchedulerImpl(
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
 tasks(i) += task
 val tid = task.taskId
-taskIdToTaskSetManager(tid) = taskSet
+taskIdToTaskSetManager.put(tid, taskSet)
 taskIdToExecutorId(tid) = execId
 executorIdToRunningTaskIds(execId).add(tid)
 availableCpus(i) -= CPUS_PER_TASK
@@ -465,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
 var reason: Option[ExecutorLossReason] = None
 synchronized {
   try {
-taskIdToTaskSetManager.get(tid) match {
+Option(taskIdToTaskSetManager.get(tid)) match {
   case Some(taskSet) =>
 if (state == TaskState.LOST) {
   // TaskState.LOST is only used by the deprecated Mesos 
fine-grained scheduling mode,
@@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
   accumUpdates: Array[(Long, Seq[AccumulatorV2[_, 

spark git commit: [SPARK-25231] Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl

2018-09-05 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 dbf0b9340 -> 31e46ec60


[SPARK-25231] Fix synchronization of executor heartbeat receiver in 
TaskSchedulerImpl

Running a large Spark job with speculation turned on was causing executor 
heartbeats to time out on the driver end after sometime and eventually, after 
hitting the max number of executor failures, the job would fail.

## What changes were proposed in this pull request?

The main reason for the heartbeat timeouts was that the 
heartbeat-receiver-event-loop-thread was blocked waiting on the 
TaskSchedulerImpl object which was being held by one of the 
dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in 
TaskSetManager.scala. On further analysis of the heartbeat receiver method 
executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that 
instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can 
remove that lock and make the operations to the global variables inside the 
code block to be atomic. The block of code in that method only uses  one global 
HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are 
ensuring atomicity of operations and speeding up the heartbeat receiver thread 
operation.

## How was this patch tested?

Screenshots of the thread dump have been attached below:
**heartbeat-receiver-event-loop-thread:**

https://user-images.githubusercontent.com/8190/44593413-e25df780-a788-11e8-9520-176a18401a59.png";>

**dispatcher-event-loop-thread:**

https://user-images.githubusercontent.com/8190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png";>

Closes #1 from pgandhi999/SPARK-25231.

Authored-by: pgandhi 
Signed-off-by: Thomas Graves 
(cherry picked from commit 559b899aceb160fcec3a57109c0b60a0ae40daeb)
Signed-off-by: Thomas Graves 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31e46ec6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31e46ec6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31e46ec6

Branch: refs/heads/branch-2.3
Commit: 31e46ec60849d315a4e83e0a332606a4405907ad
Parents: dbf0b93
Author: pgandhi 
Authored: Wed Sep 5 16:10:49 2018 -0500
Committer: Thomas Graves 
Committed: Wed Sep 5 16:11:08 2018 -0500

--
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 12 ++--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  2 +-
 .../spark/scheduler/SchedulerIntegrationSuite.scala |  3 ++-
 .../apache/spark/scheduler/TaskSchedulerImplSuite.scala |  6 +++---
 4 files changed, 12 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31e46ec6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 56c0bf6..4edc6b2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.{Locale, Timer, TimerTask}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.Set
@@ -90,7 +90,7 @@ private[spark] class TaskSchedulerImpl(
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
 
   // Protected by `this`
-  private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, 
TaskSetManager]
+  private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, 
TaskSetManager]
   val taskIdToExecutorId = new HashMap[Long, String]
 
   @volatile private var hasReceivedTask = false
@@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl(
   for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
 tasks(i) += task
 val tid = task.taskId
-taskIdToTaskSetManager(tid) = taskSet
+taskIdToTaskSetManager.put(tid, taskSet)
 taskIdToExecutorId(tid) = execId
 executorIdToRunningTaskIds(execId).add(tid)
 availableCpus(i) -= CPUS_PER_TASK
@@ -392,7 +392,7 @@ private[spark] class TaskSchedulerImpl(
 var reason: Option[ExecutorLossReason] = None
 synchronized {
   try {
-taskIdToTaskSetManager.get(tid) match {
+Option(taskIdToTaskSetManager.get(tid)) match {
   case Some(taskSet) =>
 if (state == TaskState.LOST) {
   // TaskState.LOST is only used by the deprecated Mesos 
fine-grained scheduling mode,
@@ -444,10