mridulm commented on a change in pull request #27583: [SPARK-29149][YARN]  
Update YARN cluster manager For Stage Level Scheduling
URL: https://github.com/apache/spark/pull/27583#discussion_r384930920
 
 

 ##########
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ##########
 @@ -166,69 +195,188 @@ private[yarn] class YarnAllocator(
 
   private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
-  // A map to store preferred hostname and possible task numbers running on it.
-  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
-
-  // Number of tasks that have locality preferences in active stages
-  private[yarn] var numLocalityAwareTasks: Int = 0
-
   // A container placement strategy based on pending tasks' locality preference
   private[yarn] val containerPlacementStrategy =
-    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, 
resolver)
+    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
+
+  // The default profile is always present so we need to initialize the 
datastructures keyed by
+  // ResourceProfile id to ensure its present if things start running before a 
request for
+  // executors could add it. This approach is easier then going and special 
casing everywhere.
+  private def initDefaultProfile(): Unit = synchronized {
+    allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
+      new HashMap[String, mutable.Set[ContainerId]]()
+    runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, 
mutable.HashSet[String]())
+    numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = 
new AtomicInteger(0)
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
+      SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
+    rpIdToYarnResource(DEFAULT_RESOURCE_PROFILE_ID) = defaultResource
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
+      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+  }
+
+  initDefaultProfile()
 
-  def getNumExecutorsRunning: Int = runningExecutors.size()
+  def getNumExecutorsRunning: Int = synchronized {
+    runningExecutorsPerResourceProfileId.values.map(_.size).sum
+  }
+
+  def getNumLocalityAwareTasks: Int = synchronized {
+    numLocalityAwareTasksPerResourceProfileId.values.sum
+  }
 
-  def getNumReleasedContainers: Int = releasedContainers.size()
+  def getNumExecutorsStarting: Int = {
 
 Review comment:
   synchronized on `this` ? I was expecting static analysis via \@GuardedBy to 
catch this in build, apparently we dont have that validation.
   Can you also check use of some of the other variables as well ? 
`targetNumExecutorsPerResourceProfileId`, etc also seems to have similar issues.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to