spark git commit: [SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors

2016-11-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8b33aa089 -> 1b1c849bf


[SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable 
`executorIdToRunningTaskIds` map without proper synchronization. In addition, 
as markhamstra pointed out in #15986, the signature's use of parentheses is a 
little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen 

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.

(cherry picked from commit c51c7725944d60738e2bac3e11f6aea74812905c)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b1c849b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b1c849b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b1c849b

Branch: refs/heads/branch-2.0
Commit: 1b1c849bfc3802c02bbf4585adba85907c82ff3b
Parents: 8b33aa0
Author: Josh Rosen 
Authored: Wed Nov 30 14:47:41 2016 -0500
Committer: Andrew Or 
Committed: Wed Nov 30 14:48:03 2016 -0500

--
 core/src/main/scala/org/apache/spark/SparkStatusTracker.scala| 2 +-
 .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala  | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1b1c849b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 52c4656..22a553e 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
   def getExecutorInfos: Array[SparkExecutorInfo] = {
 val executorIdToRunningTasks: Map[String, Int] =
-  
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
+  sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
 
 sc.getExecutorStorageStatus.map { status =>
   val bmId = status.blockManagerId

http://git-wip-us.apache.org/repos/asf/spark/blob/1b1c849b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index b2ef41e..feab4be 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -91,7 +91,7 @@ private[spark] class TaskSchedulerImpl(
   // IDs of the tasks running on each executor
   private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
 
-  def runningTasksByExecutors(): Map[String, Int] = {
+  def runningTasksByExecutors: Map[String, Int] = synchronized {
 executorIdToRunningTaskIds.toMap.mapValues(_.size)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1b1c849b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 19b6fec..46c6a93 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -304,7 +304,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
 // Check that state associated with the lost task attempt is cleaned up:
 assert(taskScheduler.taskIdToExecutorId.isEmpty)
 assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
-assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
+assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
   }
 
   test("if a task finishes with TaskState.LOST its executor is marked as 
dead") {
@@ -335,7 +335,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
 // Check that state associated with the lost task attempt is cleaned up:
 assert(taskScheduler.taskIdToExecutorId.isEmpty)
 

spark git commit: [SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors

2016-11-30 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 eae85da38 -> 7c0e2962d


[SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable 
`executorIdToRunningTaskIds` map without proper synchronization. In addition, 
as markhamstra pointed out in #15986, the signature's use of parentheses is a 
little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen 

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.

(cherry picked from commit c51c7725944d60738e2bac3e11f6aea74812905c)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0e2962
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0e2962
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0e2962

Branch: refs/heads/branch-2.1
Commit: 7c0e2962d5e0fb80e4472d29dd467477f1cbcf8a
Parents: eae85da
Author: Josh Rosen 
Authored: Wed Nov 30 14:47:41 2016 -0500
Committer: Andrew Or 
Committed: Wed Nov 30 14:47:50 2016 -0500

--
 core/src/main/scala/org/apache/spark/SparkStatusTracker.scala| 2 +-
 .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala  | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c0e2962/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 52c4656..22a553e 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
   def getExecutorInfos: Array[SparkExecutorInfo] = {
 val executorIdToRunningTasks: Map[String, Int] =
-  
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
+  sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
 
 sc.getExecutorStorageStatus.map { status =>
   val bmId = status.blockManagerId

http://git-wip-us.apache.org/repos/asf/spark/blob/7c0e2962/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 67446da..b03cfe4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
   // IDs of the tasks running on each executor
   private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
 
-  def runningTasksByExecutors(): Map[String, Int] = {
+  def runningTasksByExecutors: Map[String, Int] = synchronized {
 executorIdToRunningTaskIds.toMap.mapValues(_.size)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c0e2962/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 48ec04b..e736c6c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -442,7 +442,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 // Check that state associated with the lost task attempt is cleaned up:
 assert(taskScheduler.taskIdToExecutorId.isEmpty)
 assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
-assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
+assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
   }
 
   test("if a task finishes with TaskState.LOST its executor is marked as 
dead") {
@@ -473,7 +473,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 // Check that state associated with the lost task attempt is cleaned up:
 assert(taskScheduler.taskIdToExecutorId.isEmpty)