Repository: spark
Updated Branches:
  refs/heads/master 746a558de -> b7be05a20


[SPARK-19567][CORE][SCHEDULER] Support some Schedulable variables immutability 
and access

## What changes were proposed in this pull request?
Some `Schedulable` Entities(`Pool` and `TaskSetManager`) variables need 
refactoring for _immutability_ and _access modifiers_ levels as follows:
- From `var` to `val` (if there is no requirement): This is important to 
support immutability as much as possible.
  - Sample => `Pool`: `weight`, `minShare`, `priority`, `name` and 
`taskSetSchedulingAlgorithm`.
- Access modifiers: Specially, `var`s access needs to be restricted from other 
parts of codebase to prevent potential side effects.
  - `TaskSetManager`: `tasksSuccessful`, `totalResultSize`, `calculatedTasks` 
etc...

This PR is related with #15604 and has been created seperatedly to keep patch 
content as isolated and to help the reviewers.

## How was this patch tested?
Added new UTs and existing UT coverage.

Author: erenavsarogullari <erenavsarogull...@gmail.com>

Closes #16905 from erenavsarogullari/SPARK-19567.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7be05a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7be05a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7be05a2

Branch: refs/heads/master
Commit: b7be05a203b3e2a307147ea0c6cb0dec03da82a2
Parents: 746a558
Author: erenavsarogullari <erenavsarogull...@gmail.com>
Authored: Thu Mar 23 17:20:52 2017 -0700
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Thu Mar 23 17:20:52 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/Pool.scala | 12 +++----
 .../spark/scheduler/TaskSchedulerImpl.scala     | 19 +++++++----
 .../apache/spark/scheduler/TaskSetManager.scala | 36 +++++++++++---------
 .../spark/scheduler/DAGSchedulerSuite.scala     |  8 ++---
 .../scheduler/ExternalClusterManagerSuite.scala |  4 +--
 .../org/apache/spark/scheduler/PoolSuite.scala  |  6 ++++
 .../scheduler/TaskSchedulerImplSuite.scala      | 12 +++++--
 7 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 2a69a6c..1181371 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -37,24 +37,24 @@ private[spark] class Pool(
 
   val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
   val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
-  var weight = initWeight
-  var minShare = initMinShare
+  val weight = initWeight
+  val minShare = initMinShare
   var runningTasks = 0
-  var priority = 0
+  val priority = 0
 
   // A pool's stage id is used to break the tie in scheduling.
   var stageId = -1
-  var name = poolName
+  val name = poolName
   var parent: Pool = null
 
-  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+  private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
     schedulingMode match {
       case SchedulingMode.FAIR =>
         new FairSchedulingAlgorithm()
       case SchedulingMode.FIFO =>
         new FIFOSchedulingAlgorithm()
       case _ =>
-        val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or 
FIFO instead."
+        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or 
FIFO instead."
         throw new IllegalArgumentException(msg)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bfbcfa1..8257c70 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -59,6 +59,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
   extends TaskScheduler with Logging
 {
 
+  import TaskSchedulerImpl._
+
   def this(sc: SparkContext) = {
     this(
       sc,
@@ -130,17 +132,18 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
   val mapOutputTracker = SparkEnv.get.mapOutputTracker
 
-  var schedulableBuilder: SchedulableBuilder = null
-  var rootPool: Pool = null
+  private var schedulableBuilder: SchedulableBuilder = null
   // default scheduler is FIFO
-  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
+  private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, 
SchedulingMode.FIFO.toString)
   val schedulingMode: SchedulingMode = try {
     SchedulingMode.withName(schedulingModeConf.toUpperCase)
   } catch {
     case e: java.util.NoSuchElementException =>
-      throw new SparkException(s"Unrecognized spark.scheduler.mode: 
$schedulingModeConf")
+      throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: 
$schedulingModeConf")
   }
 
+  val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
+
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
 
@@ -150,8 +153,6 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
   def initialize(backend: SchedulerBackend) {
     this.backend = backend
-    // temporarily set rootPool name to empty
-    rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
         case SchedulingMode.FIFO =>
@@ -159,7 +160,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
         case SchedulingMode.FAIR =>
           new FairSchedulableBuilder(rootPool, conf)
         case _ =>
-          throw new IllegalArgumentException(s"Unsupported 
spark.scheduler.mode: $schedulingMode")
+          throw new IllegalArgumentException(s"Unsupported 
$SCHEDULER_MODE_PROPERTY: " +
+          s"$schedulingMode")
       }
     }
     schedulableBuilder.buildPools()
@@ -683,6 +685,9 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 
 
 private[spark] object TaskSchedulerImpl {
+
+  val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
+
   /**
    * Used to balance containers across hosts.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 11633be..fd93a1f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -78,16 +78,16 @@ private[spark] class TaskSetManager(
   private val numFailures = new Array[Int](numTasks)
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
-  var tasksSuccessful = 0
+  private[scheduler] var tasksSuccessful = 0
 
-  var weight = 1
-  var minShare = 0
+  val weight = 1
+  val minShare = 0
   var priority = taskSet.priority
   var stageId = taskSet.stageId
   val name = "TaskSet_" + taskSet.id
   var parent: Pool = null
-  var totalResultSize = 0L
-  var calculatedTasks = 0
+  private var totalResultSize = 0L
+  private var calculatedTasks = 0
 
   private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = 
{
     blacklistTracker.map { _ =>
@@ -95,7 +95,7 @@ private[spark] class TaskSetManager(
     }
   }
 
-  val runningTasksSet = new HashSet[Long]
+  private[scheduler] val runningTasksSet = new HashSet[Long]
 
   override def runningTasks: Int = runningTasksSet.size
 
@@ -105,7 +105,7 @@ private[spark] class TaskSetManager(
   // state until all tasks have finished running; we keep TaskSetManagers that 
are in the zombie
   // state in order to continue to track and account for the running tasks.
   // TODO: We should kill any running task attempts when the task set manager 
becomes a zombie.
-  var isZombie = false
+  private[scheduler] var isZombie = false
 
   // Set of pending tasks for each executor. These collections are actually
   // treated as stacks, in which new tasks are added to the end of the
@@ -129,17 +129,17 @@ private[spark] class TaskSetManager(
   private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
 
   // Set containing pending tasks with no locality preferences.
-  var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
+  private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
 
   // Set containing all pending tasks (also used as a stack, as above).
-  val allPendingTasks = new ArrayBuffer[Int]
+  private val allPendingTasks = new ArrayBuffer[Int]
 
   // Tasks that can be speculated. Since these will be a small fraction of 
total
   // tasks, we'll just hold them in a HashSet.
-  val speculatableTasks = new HashSet[Int]
+  private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
-  val taskInfos = new HashMap[Long, TaskInfo]
+  private val taskInfos = new HashMap[Long, TaskInfo]
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
@@ -148,7 +148,7 @@ private[spark] class TaskSetManager(
   // Map of recent exceptions (identified by string representation and top 
stack frame) to
   // duplicate count (how many times the same exception has appeared) and time 
the full exception
   // was printed. This should ideally be an LRU map that can drop old 
exceptions automatically.
-  val recentExceptions = HashMap[String, (Int, Long)]()
+  private val recentExceptions = HashMap[String, (Int, Long)]()
 
   // Figure out the current map output tracker epoch and set it on all tasks
   val epoch = sched.mapOutputTracker.getEpoch
@@ -169,20 +169,22 @@ private[spark] class TaskSetManager(
    * This allows a performance optimization, of skipping levels that aren't 
relevant (eg., skip
    * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set 
of executors).
    */
-  var myLocalityLevels = computeValidLocalityLevels()
-  var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at 
each level
+  private[scheduler] var myLocalityLevels = computeValidLocalityLevels()
+
+  // Time to wait at each level
+  private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)
 
   // Delay scheduling variables: we keep track of our current locality level 
and the time we
   // last launched a task at that level, and move up a level when 
localityWaits[curLevel] expires.
   // We then move down if we manage to launch a "more local" task.
-  var currentLocalityIndex = 0    // Index of our current locality level in 
validLocalityLevels
-  var lastLaunchTime = clock.getTimeMillis()  // Time we last launched a task 
at this level
+  private var currentLocalityIndex = 0 // Index of our current locality level 
in validLocalityLevels
+  private var lastLaunchTime = clock.getTimeMillis()  // Time we last launched 
a task at this level
 
   override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null
 
   override def schedulingMode: SchedulingMode = SchedulingMode.NONE
 
-  var emittedTaskSizeWarning = false
+  private[scheduler] var emittedTaskSizeWarning = false
 
   /** Add a task to all the pending-task lists that it should be on. */
   private def addPendingTask(index: Int) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index dfad5db..a938900 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -110,8 +110,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
   val cancelledStages = new HashSet[Int]()
 
   val taskScheduler = new TaskScheduler() {
-    override def rootPool: Pool = null
-    override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+    override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+    override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
     override def start() = {}
     override def stop() = {}
     override def executorHeartbeatReceived(
@@ -542,8 +542,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     // make sure that the DAGScheduler doesn't crash when the TaskScheduler
     // doesn't implement killTask()
     val noKillTaskScheduler = new TaskScheduler() {
-      override def rootPool: Pool = null
-      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+      override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+      override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
       override def start(): Unit = {}
       override def stop(): Unit = {}
       override def submitTasks(taskSet: TaskSet): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index e87cebf..37c124a 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -73,8 +73,8 @@ private class DummySchedulerBackend extends SchedulerBackend {
 
 private class DummyTaskScheduler extends TaskScheduler {
   var initialized = false
-  override def rootPool: Pool = null
-  override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+  override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
+  override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
   override def start(): Unit = {}
   override def stop(): Unit = {}
   override def submitTasks(taskSet: TaskSet): Unit = {}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index cddff3d..4901062 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -286,6 +286,12 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
     assert(testPool.getSchedulableByName(taskSetManager.name) === 
taskSetManager)
   }
 
+  test("Pool should throw IllegalArgumentException when schedulingMode is not 
supported") {
+    intercept[IllegalArgumentException] {
+      new Pool("TestPool", SchedulingMode.NONE, 0, 1)
+    }
+  }
+
   private def verifyPool(rootPool: Pool, poolName: String, 
expectedInitMinShare: Int,
                          expectedInitWeight: Int, expectedSchedulingMode: 
SchedulingMode): Unit = {
     val selectedPool = rootPool.getSchedulableByName(poolName)

http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9ae0bcd..8b9d45f 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -75,9 +75,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
     val conf = new 
SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
-    confs.foreach { case (k, v) =>
-      conf.set(k, v)
-    }
+    confs.foreach { case (k, v) => conf.set(k, v) }
     sc = new SparkContext(conf)
     taskScheduler = new TaskSchedulerImpl(sc)
     setupHelper()
@@ -904,4 +902,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(taskDescs.size === 1)
     assert(taskDescs.head.executorId === "exec2")
   }
+
+  test("TaskScheduler should throw IllegalArgumentException when 
schedulingMode is not supported") {
+    intercept[IllegalArgumentException] {
+      val taskScheduler = setupScheduler(
+        TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY -> 
SchedulingMode.NONE.toString)
+      taskScheduler.initialize(new FakeSchedulerBackend)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to