Repository: spark Updated Branches: refs/heads/master 746a558de -> b7be05a20
[SPARK-19567][CORE][SCHEDULER] Support some Schedulable variables immutability and access ## What changes were proposed in this pull request? Some `Schedulable` Entities(`Pool` and `TaskSetManager`) variables need refactoring for _immutability_ and _access modifiers_ levels as follows: - From `var` to `val` (if there is no requirement): This is important to support immutability as much as possible. - Sample => `Pool`: `weight`, `minShare`, `priority`, `name` and `taskSetSchedulingAlgorithm`. - Access modifiers: Specially, `var`s access needs to be restricted from other parts of codebase to prevent potential side effects. - `TaskSetManager`: `tasksSuccessful`, `totalResultSize`, `calculatedTasks` etc... This PR is related with #15604 and has been created seperatedly to keep patch content as isolated and to help the reviewers. ## How was this patch tested? Added new UTs and existing UT coverage. Author: erenavsarogullari <erenavsarogull...@gmail.com> Closes #16905 from erenavsarogullari/SPARK-19567. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7be05a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7be05a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7be05a2 Branch: refs/heads/master Commit: b7be05a203b3e2a307147ea0c6cb0dec03da82a2 Parents: 746a558 Author: erenavsarogullari <erenavsarogull...@gmail.com> Authored: Thu Mar 23 17:20:52 2017 -0700 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Thu Mar 23 17:20:52 2017 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/Pool.scala | 12 +++---- .../spark/scheduler/TaskSchedulerImpl.scala | 19 +++++++---- .../apache/spark/scheduler/TaskSetManager.scala | 36 +++++++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 8 ++--- .../scheduler/ExternalClusterManagerSuite.scala | 4 +-- .../org/apache/spark/scheduler/PoolSuite.scala | 6 ++++ .../scheduler/TaskSchedulerImplSuite.scala | 12 +++++-- 7 files changed, 58 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/main/scala/org/apache/spark/scheduler/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 2a69a6c..1181371 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -37,24 +37,24 @@ private[spark] class Pool( val schedulableQueue = new ConcurrentLinkedQueue[Schedulable] val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] - var weight = initWeight - var minShare = initMinShare + val weight = initWeight + val minShare = initMinShare var runningTasks = 0 - var priority = 0 + val priority = 0 // A pool's stage id is used to break the tie in scheduling. var stageId = -1 - var name = poolName + val name = poolName var parent: Pool = null - var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { + private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { case SchedulingMode.FAIR => new FairSchedulingAlgorithm() case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm() case _ => - val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead." + val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead." throw new IllegalArgumentException(msg) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bfbcfa1..8257c70 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -59,6 +59,8 @@ private[spark] class TaskSchedulerImpl private[scheduler]( extends TaskScheduler with Logging { + import TaskSchedulerImpl._ + def this(sc: SparkContext) = { this( sc, @@ -130,17 +132,18 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val mapOutputTracker = SparkEnv.get.mapOutputTracker - var schedulableBuilder: SchedulableBuilder = null - var rootPool: Pool = null + private var schedulableBuilder: SchedulableBuilder = null // default scheduler is FIFO - private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") + private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) val schedulingMode: SchedulingMode = try { SchedulingMode.withName(schedulingModeConf.toUpperCase) } catch { case e: java.util.NoSuchElementException => - throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") + throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf") } + val rootPool: Pool = new Pool("", schedulingMode, 0, 0) + // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) @@ -150,8 +153,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( def initialize(backend: SchedulerBackend) { this.backend = backend - // temporarily set rootPool name to empty - rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => @@ -159,7 +160,8 @@ private[spark] class TaskSchedulerImpl private[scheduler]( case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => - throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode") + throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + + s"$schedulingMode") } } schedulableBuilder.buildPools() @@ -683,6 +685,9 @@ private[spark] class TaskSchedulerImpl private[scheduler]( private[spark] object TaskSchedulerImpl { + + val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode" + /** * Used to balance containers across hosts. * http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 11633be..fd93a1f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -78,16 +78,16 @@ private[spark] class TaskSetManager( private val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) - var tasksSuccessful = 0 + private[scheduler] var tasksSuccessful = 0 - var weight = 1 - var minShare = 0 + val weight = 1 + val minShare = 0 var priority = taskSet.priority var stageId = taskSet.stageId val name = "TaskSet_" + taskSet.id var parent: Pool = null - var totalResultSize = 0L - var calculatedTasks = 0 + private var totalResultSize = 0L + private var calculatedTasks = 0 private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = { blacklistTracker.map { _ => @@ -95,7 +95,7 @@ private[spark] class TaskSetManager( } } - val runningTasksSet = new HashSet[Long] + private[scheduler] val runningTasksSet = new HashSet[Long] override def runningTasks: Int = runningTasksSet.size @@ -105,7 +105,7 @@ private[spark] class TaskSetManager( // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie // state in order to continue to track and account for the running tasks. // TODO: We should kill any running task attempts when the task set manager becomes a zombie. - var isZombie = false + private[scheduler] var isZombie = false // Set of pending tasks for each executor. These collections are actually // treated as stacks, in which new tasks are added to the end of the @@ -129,17 +129,17 @@ private[spark] class TaskSetManager( private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] // Set containing pending tasks with no locality preferences. - var pendingTasksWithNoPrefs = new ArrayBuffer[Int] + private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int] // Set containing all pending tasks (also used as a stack, as above). - val allPendingTasks = new ArrayBuffer[Int] + private val allPendingTasks = new ArrayBuffer[Int] // Tasks that can be speculated. Since these will be a small fraction of total // tasks, we'll just hold them in a HashSet. - val speculatableTasks = new HashSet[Int] + private[scheduler] val speculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID) - val taskInfos = new HashMap[Long, TaskInfo] + private val taskInfos = new HashMap[Long, TaskInfo] // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = @@ -148,7 +148,7 @@ private[spark] class TaskSetManager( // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception // was printed. This should ideally be an LRU map that can drop old exceptions automatically. - val recentExceptions = HashMap[String, (Int, Long)]() + private val recentExceptions = HashMap[String, (Int, Long)]() // Figure out the current map output tracker epoch and set it on all tasks val epoch = sched.mapOutputTracker.getEpoch @@ -169,20 +169,22 @@ private[spark] class TaskSetManager( * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). */ - var myLocalityLevels = computeValidLocalityLevels() - var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level + private[scheduler] var myLocalityLevels = computeValidLocalityLevels() + + // Time to wait at each level + private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait) // Delay scheduling variables: we keep track of our current locality level and the time we // last launched a task at that level, and move up a level when localityWaits[curLevel] expires. // We then move down if we manage to launch a "more local" task. - var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels - var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level + private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels + private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - var emittedTaskSizeWarning = false + private[scheduler] var emittedTaskSizeWarning = false /** Add a task to all the pending-task lists that it should be on. */ private def addPendingTask(index: Int) { http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index dfad5db..a938900 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -110,8 +110,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val cancelledStages = new HashSet[Int]() val taskScheduler = new TaskScheduler() { - override def rootPool: Pool = null - override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def schedulingMode: SchedulingMode = SchedulingMode.FIFO + override def rootPool: Pool = new Pool("", schedulingMode, 0, 0) override def start() = {} override def stop() = {} override def executorHeartbeatReceived( @@ -542,8 +542,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // make sure that the DAGScheduler doesn't crash when the TaskScheduler // doesn't implement killTask() val noKillTaskScheduler = new TaskScheduler() { - override def rootPool: Pool = null - override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def schedulingMode: SchedulingMode = SchedulingMode.FIFO + override def rootPool: Pool = new Pool("", schedulingMode, 0, 0) override def start(): Unit = {} override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index e87cebf..37c124a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -73,8 +73,8 @@ private class DummySchedulerBackend extends SchedulerBackend { private class DummyTaskScheduler extends TaskScheduler { var initialized = false - override def rootPool: Pool = null - override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def schedulingMode: SchedulingMode = SchedulingMode.FIFO + override def rootPool: Pool = new Pool("", schedulingMode, 0, 0) override def start(): Unit = {} override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = {} http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index cddff3d..4901062 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -286,6 +286,12 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager) } + test("Pool should throw IllegalArgumentException when schedulingMode is not supported") { + intercept[IllegalArgumentException] { + new Pool("TestPool", SchedulingMode.NONE, 0, 1) + } + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) http://git-wip-us.apache.org/repos/asf/spark/blob/b7be05a2/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9ae0bcd..8b9d45f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -75,9 +75,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") - confs.foreach { case (k, v) => - conf.set(k, v) - } + confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc) setupHelper() @@ -904,4 +902,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskDescs.size === 1) assert(taskDescs.head.executorId === "exec2") } + + test("TaskScheduler should throw IllegalArgumentException when schedulingMode is not supported") { + intercept[IllegalArgumentException] { + val taskScheduler = setupScheduler( + TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY -> SchedulingMode.NONE.toString) + taskScheduler.initialize(new FakeSchedulerBackend) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org