Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140293577 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { + jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { + null + } + + val maxConTasks = if (jobGroupId != null && + conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { + Int.MaxValue + } + + if (maxConTasks <= 0) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- I understand your reasoning about setting the maxConTasks every time the job is set. However, I am not able to understand the scenario which you described. If a job completes and new one kicks off immediately, how does the new job partially overlap? Its only when all the stages & underlying tasks for the previous job have finished we would mark it as complete. So a new job won't overlap with a completed one. Am I missing something here?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org