hanghangliu commented on code in PR #3487:
URL: https://github.com/apache/gobblin/pull/3487#discussion_r854808972
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -217,24 +237,46 @@ void runInternal() {
WorkflowConfig workflowConfig = workFlowEntry.getValue();
JobDag jobDag = workflowConfig.getJobDag();
-
Set<String> jobs = jobDag.getAllNodes();
// sum up the number of partitions
for (String jobName : jobs) {
JobContext jobContext = taskDriver.getJobContext(jobName);
-
+ JobConfig jobConfig = taskDriver.getJobConfig(jobName);
+ Resource resource =
Resource.newInstance(this.defaultContainerMemoryMbs,
this.defaultContainerCores);
+ int partitions = 0;
+ String jobTag =helixInstanceTags;
if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
- .filter(e -> e != null).collect(Collectors.toSet()));
+ .filter(Objects::nonNull).collect(Collectors.toSet()));
numPartitions += jobContext.getPartitionSet().size();
+ partitions += jobContext.getPartitionSet().size();
+ // Respect job level config for helix instance tag, specific
resource requirement if there's any
+ if (jobConfig != null) {
+ if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
+ jobTag = jobConfig.getInstanceGroupTag();
+ }
+ Map<String, String> jobCommandConfigMap =
jobConfig.getJobCommandConfigMap();
+
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
+
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
+ }
+
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
+
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
+ }
+ }
}
+ int containerCount = (int) Math.ceil(((double)partitions /
this.partitionsPerContainer) * this.overProvisionFactor);
+
YarnHelixUtils.ensureResourceFitMaxCapacity(this.yarnService.getMaxResourceCapacity(),
resource);
Review Comment:
Updated the logic. Now the max resource validation will only be addressed in
YarnService, and will fail fast if Yarn can't meet our resource request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]