spark git commit: [SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors
Repository: spark Updated Branches: refs/heads/branch-2.0 8b33aa089 -> 1b1c849bf [SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh RosenCloses #16073 from JoshRosen/runningTasksByExecutors-thread-safety. (cherry picked from commit c51c7725944d60738e2bac3e11f6aea74812905c) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b1c849b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b1c849b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b1c849b Branch: refs/heads/branch-2.0 Commit: 1b1c849bfc3802c02bbf4585adba85907c82ff3b Parents: 8b33aa0 Author: Josh Rosen Authored: Wed Nov 30 14:47:41 2016 -0500 Committer: Andrew Or Committed: Wed Nov 30 14:48:03 2016 -0500 -- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala| 2 +- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b1c849b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 52c4656..22a553e 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { */ def getExecutorInfos: Array[SparkExecutorInfo] = { val executorIdToRunningTasks: Map[String, Int] = - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors sc.getExecutorStorageStatus.map { status => val bmId = status.blockManagerId http://git-wip-us.apache.org/repos/asf/spark/blob/1b1c849b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b2ef41e..feab4be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors(): Map[String, Int] = { + def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size) } http://git-wip-us.apache.org/repos/asf/spark/blob/1b1c849b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 19b6fec..46c6a93 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -304,7 +304,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty) assert(taskScheduler.taskIdToTaskSetManager.isEmpty) -assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) +assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) } test("if a task finishes with TaskState.LOST its executor is marked as dead") { @@ -335,7 +335,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty)
spark git commit: [SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors
Repository: spark Updated Branches: refs/heads/branch-2.1 eae85da38 -> 7c0e2962d [SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh RosenCloses #16073 from JoshRosen/runningTasksByExecutors-thread-safety. (cherry picked from commit c51c7725944d60738e2bac3e11f6aea74812905c) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0e2962 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0e2962 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0e2962 Branch: refs/heads/branch-2.1 Commit: 7c0e2962d5e0fb80e4472d29dd467477f1cbcf8a Parents: eae85da Author: Josh Rosen Authored: Wed Nov 30 14:47:41 2016 -0500 Committer: Andrew Or Committed: Wed Nov 30 14:47:50 2016 -0500 -- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala| 2 +- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c0e2962/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 52c4656..22a553e 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { */ def getExecutorInfos: Array[SparkExecutorInfo] = { val executorIdToRunningTasks: Map[String, Int] = - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors sc.getExecutorStorageStatus.map { status => val bmId = status.blockManagerId http://git-wip-us.apache.org/repos/asf/spark/blob/7c0e2962/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 67446da..b03cfe4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors(): Map[String, Int] = { + def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size) } http://git-wip-us.apache.org/repos/asf/spark/blob/7c0e2962/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 48ec04b..e736c6c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -442,7 +442,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty) assert(taskScheduler.taskIdToTaskSetManager.isEmpty) -assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) +assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) } test("if a task finishes with TaskState.LOST its executor is marked as dead") { @@ -473,7 +473,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty)