Blazer-007 commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1945974772
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -71,29 +164,103 @@ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
+
+ // Correct the actualWorkforceStaffing in case of
handleContainerCompletion() getting called before onContainersAllocated()
+ Iterator<ContainerId> iterator = removedContainerIds.iterator();
+ while (iterator.hasNext()) {
+ ContainerId containerId = iterator.next();
+ ContainerInfo containerInfo = this.containerMap.remove(containerId);
+ if (containerInfo != null) {
+ WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1,
+ System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate
setPoints for the workerProfile,
+ // otherwise extra containers will be requested when calculating
deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+ iterator.remove();
+ }
+ }
+
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}
private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
- if (profileDelta.getDelta() > 0) { // scale up!
- WorkerProfile workerProfile = profileDelta.getProfile();
- String profileName = workerProfile.getName();
- int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
- int delta = profileDelta.getDelta();
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int delta = profileDelta.getDelta();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+ if (delta > 0) { // scale up!
log.info("Requesting {} new containers for profile {} having currently
{} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName,
currNumContainers + delta, System.currentTimeMillis());
- } else if (profileDelta.getDelta() < 0) { // scale down!
- // TODO: Decide how to handle negative deltas
- log.warn("Handling of Negative delta is not supported yet : Profile {}
delta {} ",
- profileDelta.getProfile().getName(), profileDelta.getDelta());
+ } else if (delta < 0) { // scale down!
+ log.info("Releasing {} containers for profile {} having currently {}
containers", -delta,
+ WorkforceProfiles.renderName(profileName), currNumContainers);
+ releaseContainersForWorkerProfile(profileName, delta);
+ // update our staffing after releasing containers
+ int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+ this.actualWorkforceStaffing.reviseStaffing(profileName,
numContainersAfterRelease, System.currentTimeMillis());
} // else, already at staffing plan (or at least have requested, so
in-progress)
});
}
+ private void handleAbortedContainer(ContainerId completedContainerId,
ContainerInfo completedContainerInfo) {
+ // Case 1 : Container release requested while scaling down
+ if (this.releasedContainerCache.getIfPresent(completedContainerId) !=
null) {
+ log.info("Container {} was released while downscaling for profile {}",
completedContainerId, completedContainerInfo.getWorkerProfileName());
+ this.releasedContainerCache.invalidate(completedContainerId);
+ return;
+ }
+
+ // Case 2 : Container release was not requested, we need to request a
replacement container
+ log.info("Container {} aborted for profile {}, starting to launch a
replacement container", completedContainerId,
completedContainerInfo.getWorkerProfileName());
+
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+ }
+
+ private synchronized void handleContainerExitedWithOOM(ContainerId
completedContainerId, ContainerInfo completedContainerInfo) {
+ log.info("Container {} for profile {} exited with OOM, starting to launch
a replacement container",
+ completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+ List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+ // Update the current staffing to reflect the container that exited with
OOM
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate setPoints
for the workerProfile,
+ // otherwise extra containers will be requested when calculating deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+
+ // Request a replacement container
+ int currContainerMemoryMbs =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ int newContainerMemoryMbs = currContainerMemoryMbs *
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
+ if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS &&
newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+ newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS;
+ } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+ log.warn("Expected replacement container memory exceeds the maximum
allowed memory {}. Not requesting a replacement container.",
+ MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
+ return;
Review Comment:
This will not work if `currContainerMemoryMbs` is already at
`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS` it will keep requesting containers of
`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS` which will not help in anyway
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]