khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1943436413
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}
+ /**
+ * Handle the completion of a container. A new container will be requested
to replace the one
+ * that just exited depending on the exit status.
+ * <p>
+ * A container completes in either of the following conditions:
+ * <ol>
+ * <li> The container gets stopped by the ApplicationMaster. </li>
+ * <li> Some error happens in the container and caused the container to
exit </li>
+ * <li> The container gets preempted by the ResourceManager </li>
+ * <li> The container gets killed due to some reason, for example, if it
runs over the allowed amount of virtual or physical memory </li>
+ * </ol>
+ * A replacement container is needed in all except the first case.
+ * </p>
+ */
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
+ ContainerId completedContainerId = containerStatus.getContainerId();
+ ContainerInfo completedContainerInfo =
this.containerMap.remove(completedContainerId);
+
+ // Because callbacks are processed asynchronously, we might encounter
situations where handleContainerCompletion()
+ // is called before onContainersAllocated(), resulting in the containerId
missing from the containersMap.
+ // We use removedContainerIds to remember these containers and remove them
from containerMap later
+ // when we call reviseWorkforcePlanAndRequestNewContainers method
+ if (completedContainerInfo == null) {
+ log.warn("Container {} not found in containerMap. This container
onContainersCompleted() likely called before onContainersAllocated()",
+ completedContainerId);
+ this.removedContainerIds.add(completedContainerId);
+ return;
+ }
+
+ log.info("Container {} running profile {} completed with exit status {}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getExitStatus());
+
+ if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+ log.info("Container {} running profile {} completed with diagnostics:
{}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getDiagnostics());
+ }
+
+ if (this.shutdownInProgress) {
+ log.info("Ignoring container completion for container {} as shutdown is
in progress", completedContainerId);
+ return;
+ }
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+ switch (containerStatus.getExitStatus()) {
+ case(ContainerExitStatus.ABORTED):
+ handleAbortedContainer(completedContainerId, completedContainerInfo);
+ break;
+ case(ContainerExitStatus.PREEMPTED):
+ log.info("Container {} for profile {} preempted, starting to launching
a replacement container",
+ completedContainerId,
completedContainerInfo.getWorkerProfileName());
+ requestContainersForWorkerProfile(workerProfile, 1);
+ break;
+ case(GENERAL_OOM_EXIT_STATUS_CODE):
+ case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+ case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+ handleContainerExitedWithOOM(completedContainerId,
completedContainerInfo);
+ break;
+ case(1): // Same as linux exit status 1 Often occurs when
launch_container.sh failed
+ log.info("Exit status 1.CompletedContainerInfo = {}",
completedContainerInfo);
+ break;
+ default:
+ break;
Review Comment:
let's add a no-op for `KILLED_AFTER_APP_COMPLETION` & `SUCCESS` and add a
log statement for default case, since there are other statuses like
`DISKS_FAILED`, `KILLED_BY_CONTAINER_SCHEDULER` also in ContainerExitStatus, so
having a log would be useful in case that also needs to be handled in future
```
case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
case ContainerExitStatus.SUCCESS:
break;
default:
// log any other unhandled completion code
log.warn("Container {} exited with unhandled status code {}.
ContainerInfo: {}",
completedContainerId, containerStatus.getExitStatus(),
completedContainerInfo);
break;
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -71,29 +158,101 @@ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
+
+ // Correct the actualWorkforceStaffing in case of
handleContainerCompletion() getting called before onContainersAllocated()
+ Iterator<ContainerId> iterator = removedContainerIds.iterator();
+ while (iterator.hasNext()) {
+ ContainerId containerId = iterator.next();
+ ContainerInfo containerInfo = this.containerMap.remove(containerId);
+ if (containerInfo != null) {
+ WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1,
+ System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate
setPoints for the workerProfile,
+ // otherwise extra containers will be requested when calculating
deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+ iterator.remove();
+ }
+ }
+
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}
private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
- if (profileDelta.getDelta() > 0) { // scale up!
- WorkerProfile workerProfile = profileDelta.getProfile();
- String profileName = workerProfile.getName();
- int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
- int delta = profileDelta.getDelta();
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int delta = profileDelta.getDelta();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+ if (delta > 0) { // scale up!
log.info("Requesting {} new containers for profile {} having currently
{} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName,
currNumContainers + delta, System.currentTimeMillis());
- } else if (profileDelta.getDelta() < 0) { // scale down!
- // TODO: Decide how to handle negative deltas
- log.warn("Handling of Negative delta is not supported yet : Profile {}
delta {} ",
- profileDelta.getProfile().getName(), profileDelta.getDelta());
+ } else if (delta < 0) { // scale down!
+ log.info("Releasing {} containers for profile {} having currently {}
containers", -delta,
+ WorkforceProfiles.renderName(profileName), currNumContainers);
+ releaseContainersForWorkerProfile(profileName, delta);
+ // update our staffing after releasing containers
+ int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+ this.actualWorkforceStaffing.reviseStaffing(profileName,
numContainersAfterRelease, System.currentTimeMillis());
} // else, already at staffing plan (or at least have requested, so
in-progress)
});
}
+ private void handleAbortedContainer(ContainerId completedContainerId,
ContainerInfo completedContainerInfo) {
+ // Case 1 : Container release requested while scaling down
+ if (this.releasedContainerCache.getIfPresent(completedContainerId) !=
null) {
+ log.info("Container {} was released while downscaling for profile {}",
completedContainerId, completedContainerInfo.getWorkerProfileName());
+ this.releasedContainerCache.invalidate(completedContainerId);
+ return;
+ }
+
+ // Case 2 : Container release was not requested, we need to request a
replacement container
+ log.info("Container {} aborted for profile {}, starting to launch a
replacement container", completedContainerId,
completedContainerInfo.getWorkerProfileName());
+
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+ }
+
+ private synchronized void handleContainerExitedWithOOM(ContainerId
completedContainerId, ContainerInfo completedContainerInfo) {
+ log.info("Container {} for profile {} exited with OOM, starting to launch
a replacement container",
+ completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+ List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+ // Update the current staffing to reflect the container that exited with
OOM
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate setPoints
for the workerProfile,
+ // otherwise extra containers will be requested when calculating deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+
+ // Request a replacement container
+ int currContainerMemoryMbs =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ int newContainerMemoryMbs = currContainerMemoryMbs *
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
Review Comment:
If `currContainerMemoryMbs` is 36 GB, then `newContainerMemoryMbs` would be
72GB(considering 2 as multiplier). However, that is higher than
64GB(`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`) and in this case we would be
skipping the container request entirely while OOM could have been handled by
launching 64 GB container. We should launch the last container with
`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}
+ /**
+ * Handle the completion of a container. A new container will be requested
to replace the one
+ * that just exited depending on the exit status.
+ * <p>
+ * A container completes in either of the following conditions:
+ * <ol>
+ * <li> The container gets stopped by the ApplicationMaster. </li>
+ * <li> Some error happens in the container and caused the container to
exit </li>
+ * <li> The container gets preempted by the ResourceManager </li>
+ * <li> The container gets killed due to some reason, for example, if it
runs over the allowed amount of virtual or physical memory </li>
+ * </ol>
+ * A replacement container is needed in all except the first case.
+ * </p>
+ */
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
Review Comment:
this method is not `synchronized`, but it removes entries from containerMap
and modifies removedContainerIds, whereas,
`reviseWorkforcePlanAndRequestNewContainers` is synchronized and also modifies
both containerMap and removedContainerIds. If request for
`handleContainerCompletion` interleaves with a call to
`reviseWorkforcePlanAndRequestNewContainers`, race conditions/inconsistent
state can happen
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -411,10 +330,24 @@ protected synchronized void
requestContainersForWorkerProfile(WorkerProfile work
requestContainers(numContainers, Resource.newInstance(containerMemoryMbs,
containerCores), Optional.of(allocationRequestId));
}
- private void requestContainer(Optional<String> preferredNode,
Optional<Resource> resourceOptional) {
- Resource desiredResource = resourceOptional.or(Resource.newInstance(
- this.requestedContainerMemoryMbs, this.requestedContainerCores));
- requestContainer(preferredNode, desiredResource, Optional.absent());
+ protected synchronized void releaseContainersForWorkerProfile(String
profileName, int numContainers) {
+ Iterator<Map.Entry<ContainerId, ContainerInfo>> containerMapIterator =
this.containerMap.entrySet().iterator();
+ while (containerMapIterator.hasNext() && numContainers > 0) {
+ Map.Entry<ContainerId, ContainerInfo> entry =
containerMapIterator.next();
+ if (entry.getValue().getWorkerProfile().getName().equals(profileName)) {
+ ContainerId containerId = entry.getKey();
+ LOGGER.info("Releasing container {} running profile {}", containerId,
WorkforceProfiles.renderName(profileName));
+ // Record that this container was explicitly released so that a new
one is not spawned to replace it
+ // Put the container id in the releasedContainerCache before releasing
it so that handleContainerCompletion()
+ // can check for the container id and skip spawning a replacement
container.
+ // Note that this is the best effort since these are asynchronous
operations and a container may abort concurrently
+ // with the release call. So in some cases a replacement container may
have already been spawned before
+ // the container is put into the black list.
+ this.releasedContainerCache.put(containerId, "");
+ this.amrmClientAsync.releaseAssignedContainer(containerId);
+ numContainers--;
+ }
+ }
Review Comment:
please add a log here for how many containers were intended to be released
and how many were actually released from this method
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}
+ /**
+ * Handle the completion of a container. A new container will be requested
to replace the one
+ * that just exited depending on the exit status.
+ * <p>
+ * A container completes in either of the following conditions:
+ * <ol>
+ * <li> The container gets stopped by the ApplicationMaster. </li>
+ * <li> Some error happens in the container and caused the container to
exit </li>
+ * <li> The container gets preempted by the ResourceManager </li>
+ * <li> The container gets killed due to some reason, for example, if it
runs over the allowed amount of virtual or physical memory </li>
+ * </ol>
+ * A replacement container is needed in all except the first case.
+ * </p>
+ */
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
+ ContainerId completedContainerId = containerStatus.getContainerId();
+ ContainerInfo completedContainerInfo =
this.containerMap.remove(completedContainerId);
+
+ // Because callbacks are processed asynchronously, we might encounter
situations where handleContainerCompletion()
+ // is called before onContainersAllocated(), resulting in the containerId
missing from the containersMap.
+ // We use removedContainerIds to remember these containers and remove them
from containerMap later
+ // when we call reviseWorkforcePlanAndRequestNewContainers method
+ if (completedContainerInfo == null) {
+ log.warn("Container {} not found in containerMap. This container
onContainersCompleted() likely called before onContainersAllocated()",
+ completedContainerId);
+ this.removedContainerIds.add(completedContainerId);
+ return;
+ }
+
+ log.info("Container {} running profile {} completed with exit status {}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getExitStatus());
+
+ if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+ log.info("Container {} running profile {} completed with diagnostics:
{}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getDiagnostics());
+ }
+
+ if (this.shutdownInProgress) {
+ log.info("Ignoring container completion for container {} as shutdown is
in progress", completedContainerId);
+ return;
+ }
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+ switch (containerStatus.getExitStatus()) {
+ case(ContainerExitStatus.ABORTED):
+ handleAbortedContainer(completedContainerId, completedContainerInfo);
+ break;
+ case(ContainerExitStatus.PREEMPTED):
+ log.info("Container {} for profile {} preempted, starting to launching
a replacement container",
+ completedContainerId,
completedContainerInfo.getWorkerProfileName());
+ requestContainersForWorkerProfile(workerProfile, 1);
+ break;
+ case(GENERAL_OOM_EXIT_STATUS_CODE):
+ case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+ case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+ handleContainerExitedWithOOM(completedContainerId,
completedContainerInfo);
+ break;
+ case(1): // Same as linux exit status 1 Often occurs when
launch_container.sh failed
Review Comment:
use constant `private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE =
1;`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]