Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140340018 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { + jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { + null + } + + val maxConTasks = if (jobGroupId != null && + conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { + Int.MaxValue + } + + if (maxConTasks <= 0) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- Do we even need mutable state associated with a job group? Some things would be a lot simpler if maxConTasks could only be set when the job group is created; and if you need a different number of maxConTasks, then you have to use a different job group.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org