Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140136101 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { + jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { + null + } + + val maxConTasks = if (jobGroupId != null && + conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { + Int.MaxValue + } + + if (maxConTasks <= 0) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- lemme put my concern another way: why don't you remove the `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, and just unconditionally always make the assignment `jobGroupToMaxConTasks(jobGroupId) = maxConTasks`? that is simpler to reason about, and has all the properties we want. I agree the scenario I'm describing is pretty weird, but the only difference I see between your version and this is in that scenario. And its probably not the behavior we want in that scenario.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org