Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19194#discussion_r140337893
  
    --- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
    +
    +      var jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        null
    +      }
    +
    +      val maxConTasks = if (jobGroupId != null &&
    +        conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
    +      } else {
    +        Int.MaxValue
    +      }
    +
    +      if (maxConTasks <= 0) {
    +        throw new IllegalArgumentException(
    +          "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
    +      }
    +
    +      if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        jobGroupId = DEFAULT_JOB_GROUP
    +      }
    +
    +      jobIdToJobGroup(jobStart.jobId) = jobGroupId
    +      if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
    --- End diff --
    
    Sorry I am doing a really bad job explaining my concerns.
    
    Of course, if you have multiple threads racing to set the value for the 
same job group, its unpredictable which value you'll get.  Nothing we can do 
about that, its not the scenario I'm talking about.
    
    Here's the step-by-step (hope I understand the proposed change correctly):
    
    (1) T1: Set jobgroup = "foo".  Set maxConTasks=10.
    (2) T1: Launch job 1.1  Uses maxConTasks=10
    (3) T2: Set jobgroup = "foo".
    (4) T2: Launch job 2.1. Uses maxConTasks=10
    (5) T1:  Finish job 1.1.  Do *not* remove the entry for 
`jobGroupToMaxConTasks("foo")`, because there is still another job running for 
this job group in T2 
(https://github.com/apache/spark/pull/19194/files#diff-b096353602813e47074ace09a3890d56R664)
    (6) T1: Set maxConTasks=20
    (7) T1: Launch job 1.2.  Uses maxConTasks=10, because 
`jobGroupToMaxConTasks.contains("foo")`, so we don't reset the value.
    (8) T2: Finish job 2.1.  Again, do not remove 
`jobGroupToMaxConTasks("foo")`, as T1 is still running a job in this job group.
    (9) Set maxConTasks=20
    (10) T2: Run job 2.2.  Uses maxConTasks=10, because 
`jobGroupToMaxConTasks.contains("foo")`
    
    As long as there a job running in T2 when T1 finishes its job (or vice 
versa), we never remove the prior value, and so never update it when starting a 
new job.  We could be stuck with maxConTasks=10 in both threads indefinitely, 
even though both threads have set maxConTasks=20.
    
    If you remove that `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, then 
when job 1.2 starts, you'd use the new value of maxConTasks=20.  The only weird 
thing is that job 2.1 suddenly switches mid-flight to using maxConTasks=20 as 
well.  But that seems more reasonable to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to