Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140123047 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 + maxConcurrentTasks = getMaxConTasks + logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ + def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { + totalPendingTasks(stageId) + totalRunningTasks(stageId) + } + + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { + val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) + sumOrMax(totalTasks, activeTasks) + } + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { + stagesItr.foldLeft(0)(sumIncompleteTasksForStages) + } + + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { + (maxConTasks, x) => { + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) + sumOrMax(maxConTasks, maxTasks) + } + } + + def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else (a + b) + + def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) --- End diff -- I like the idea. I think this can be done. Will update the PR.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org