[
https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=955661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955661
]
ASF GitHub Bot logged work on GOBBLIN-2189:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Feb/25 18:49
Start Date: 05/Feb/25 18:49
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1943436413
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}
+ /**
+ * Handle the completion of a container. A new container will be requested
to replace the one
+ * that just exited depending on the exit status.
+ * <p>
+ * A container completes in either of the following conditions:
+ * <ol>
+ * <li> The container gets stopped by the ApplicationMaster. </li>
+ * <li> Some error happens in the container and caused the container to
exit </li>
+ * <li> The container gets preempted by the ResourceManager </li>
+ * <li> The container gets killed due to some reason, for example, if it
runs over the allowed amount of virtual or physical memory </li>
+ * </ol>
+ * A replacement container is needed in all except the first case.
+ * </p>
+ */
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
+ ContainerId completedContainerId = containerStatus.getContainerId();
+ ContainerInfo completedContainerInfo =
this.containerMap.remove(completedContainerId);
+
+ // Because callbacks are processed asynchronously, we might encounter
situations where handleContainerCompletion()
+ // is called before onContainersAllocated(), resulting in the containerId
missing from the containersMap.
+ // We use removedContainerIds to remember these containers and remove them
from containerMap later
+ // when we call reviseWorkforcePlanAndRequestNewContainers method
+ if (completedContainerInfo == null) {
+ log.warn("Container {} not found in containerMap. This container
onContainersCompleted() likely called before onContainersAllocated()",
+ completedContainerId);
+ this.removedContainerIds.add(completedContainerId);
+ return;
+ }
+
+ log.info("Container {} running profile {} completed with exit status {}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getExitStatus());
+
+ if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+ log.info("Container {} running profile {} completed with diagnostics:
{}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getDiagnostics());
+ }
+
+ if (this.shutdownInProgress) {
+ log.info("Ignoring container completion for container {} as shutdown is
in progress", completedContainerId);
+ return;
+ }
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+ switch (containerStatus.getExitStatus()) {
+ case(ContainerExitStatus.ABORTED):
+ handleAbortedContainer(completedContainerId, completedContainerInfo);
+ break;
+ case(ContainerExitStatus.PREEMPTED):
+ log.info("Container {} for profile {} preempted, starting to launching
a replacement container",
+ completedContainerId,
completedContainerInfo.getWorkerProfileName());
+ requestContainersForWorkerProfile(workerProfile, 1);
+ break;
+ case(GENERAL_OOM_EXIT_STATUS_CODE):
+ case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+ case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+ handleContainerExitedWithOOM(completedContainerId,
completedContainerInfo);
+ break;
+ case(1): // Same as linux exit status 1 Often occurs when
launch_container.sh failed
+ log.info("Exit status 1.CompletedContainerInfo = {}",
completedContainerInfo);
+ break;
+ default:
+ break;
Review Comment:
let's add a no-op for `KILLED_AFTER_APP_COMPLETION` & `SUCCESS` and add a
log statement for default case, since there are other statuses like
`DISKS_FAILED`, `KILLED_BY_CONTAINER_SCHEDULER` also in ContainerExitStatus, so
having a log would be useful in case that also needs to be handled in future
```
case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
case ContainerExitStatus.SUCCESS:
break;
default:
// log any other unhandled completion code
log.warn("Container {} exited with unhandled status code {}.
ContainerInfo: {}",
completedContainerId, containerStatus.getExitStatus(),
completedContainerInfo);
break;
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -71,29 +158,101 @@ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
+
+ // Correct the actualWorkforceStaffing in case of
handleContainerCompletion() getting called before onContainersAllocated()
+ Iterator<ContainerId> iterator = removedContainerIds.iterator();
+ while (iterator.hasNext()) {
+ ContainerId containerId = iterator.next();
+ ContainerInfo containerInfo = this.containerMap.remove(containerId);
+ if (containerInfo != null) {
+ WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1,
+ System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate
setPoints for the workerProfile,
+ // otherwise extra containers will be requested when calculating
deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+ iterator.remove();
+ }
+ }
+
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}
private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
- if (profileDelta.getDelta() > 0) { // scale up!
- WorkerProfile workerProfile = profileDelta.getProfile();
- String profileName = workerProfile.getName();
- int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
- int delta = profileDelta.getDelta();
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int delta = profileDelta.getDelta();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+ if (delta > 0) { // scale up!
log.info("Requesting {} new containers for profile {} having currently
{} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName,
currNumContainers + delta, System.currentTimeMillis());
- } else if (profileDelta.getDelta() < 0) { // scale down!
- // TODO: Decide how to handle negative deltas
- log.warn("Handling of Negative delta is not supported yet : Profile {}
delta {} ",
- profileDelta.getProfile().getName(), profileDelta.getDelta());
+ } else if (delta < 0) { // scale down!
+ log.info("Releasing {} containers for profile {} having currently {}
containers", -delta,
+ WorkforceProfiles.renderName(profileName), currNumContainers);
+ releaseContainersForWorkerProfile(profileName, delta);
+ // update our staffing after releasing containers
+ int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+ this.actualWorkforceStaffing.reviseStaffing(profileName,
numContainersAfterRelease, System.currentTimeMillis());
} // else, already at staffing plan (or at least have requested, so
in-progress)
});
}
+ private void handleAbortedContainer(ContainerId completedContainerId,
ContainerInfo completedContainerInfo) {
+ // Case 1 : Container release requested while scaling down
+ if (this.releasedContainerCache.getIfPresent(completedContainerId) !=
null) {
+ log.info("Container {} was released while downscaling for profile {}",
completedContainerId, completedContainerInfo.getWorkerProfileName());
+ this.releasedContainerCache.invalidate(completedContainerId);
+ return;
+ }
+
+ // Case 2 : Container release was not requested, we need to request a
replacement container
+ log.info("Container {} aborted for profile {}, starting to launch a
replacement container", completedContainerId,
completedContainerInfo.getWorkerProfileName());
+
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+ }
+
+ private synchronized void handleContainerExitedWithOOM(ContainerId
completedContainerId, ContainerInfo completedContainerInfo) {
+ log.info("Container {} for profile {} exited with OOM, starting to launch
a replacement container",
+ completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+ List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+ // Update the current staffing to reflect the container that exited with
OOM
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate setPoints
for the workerProfile,
+ // otherwise extra containers will be requested when calculating deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+
+ // Request a replacement container
+ int currContainerMemoryMbs =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ int newContainerMemoryMbs = currContainerMemoryMbs *
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
Review Comment:
If `currContainerMemoryMbs` is 36 GB, then `newContainerMemoryMbs` would be
72GB(considering 2 as multiplier). However, that is higher than
64GB(`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`) and in this case we would be
skipping the container request entirely while OOM could have been handled by
launching 64 GB container. We should launch the last container with
`MAX_REPLACEMENT_CONTAINER_MEMORY_MBS`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}
+ /**
+ * Handle the completion of a container. A new container will be requested
to replace the one
+ * that just exited depending on the exit status.
+ * <p>
+ * A container completes in either of the following conditions:
+ * <ol>
+ * <li> The container gets stopped by the ApplicationMaster. </li>
+ * <li> Some error happens in the container and caused the container to
exit </li>
+ * <li> The container gets preempted by the ResourceManager </li>
+ * <li> The container gets killed due to some reason, for example, if it
runs over the allowed amount of virtual or physical memory </li>
+ * </ol>
+ * A replacement container is needed in all except the first case.
+ * </p>
+ */
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
Review Comment:
this method is not `synchronized`, but it removes entries from containerMap
and modifies removedContainerIds, whereas,
`reviseWorkforcePlanAndRequestNewContainers` is synchronized and also modifies
both containerMap and removedContainerIds. If request for
`handleContainerCompletion` interleaves with a call to
`reviseWorkforcePlanAndRequestNewContainers`, race conditions/inconsistent
state can happen
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -411,10 +330,24 @@ protected synchronized void
requestContainersForWorkerProfile(WorkerProfile work
requestContainers(numContainers, Resource.newInstance(containerMemoryMbs,
containerCores), Optional.of(allocationRequestId));
}
- private void requestContainer(Optional<String> preferredNode,
Optional<Resource> resourceOptional) {
- Resource desiredResource = resourceOptional.or(Resource.newInstance(
- this.requestedContainerMemoryMbs, this.requestedContainerCores));
- requestContainer(preferredNode, desiredResource, Optional.absent());
+ protected synchronized void releaseContainersForWorkerProfile(String
profileName, int numContainers) {
+ Iterator<Map.Entry<ContainerId, ContainerInfo>> containerMapIterator =
this.containerMap.entrySet().iterator();
+ while (containerMapIterator.hasNext() && numContainers > 0) {
+ Map.Entry<ContainerId, ContainerInfo> entry =
containerMapIterator.next();
+ if (entry.getValue().getWorkerProfile().getName().equals(profileName)) {
+ ContainerId containerId = entry.getKey();
+ LOGGER.info("Releasing container {} running profile {}", containerId,
WorkforceProfiles.renderName(profileName));
+ // Record that this container was explicitly released so that a new
one is not spawned to replace it
+ // Put the container id in the releasedContainerCache before releasing
it so that handleContainerCompletion()
+ // can check for the container id and skip spawning a replacement
container.
+ // Note that this is the best effort since these are asynchronous
operations and a container may abort concurrently
+ // with the release call. So in some cases a replacement container may
have already been spawned before
+ // the container is put into the black list.
+ this.releasedContainerCache.put(containerId, "");
+ this.amrmClientAsync.releaseAssignedContainer(containerId);
+ numContainers--;
+ }
+ }
Review Comment:
please add a log here for how many containers were intended to be released
and how many were actually released from this method
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -62,6 +82,73 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}
+ /**
+ * Handle the completion of a container. A new container will be requested
to replace the one
+ * that just exited depending on the exit status.
+ * <p>
+ * A container completes in either of the following conditions:
+ * <ol>
+ * <li> The container gets stopped by the ApplicationMaster. </li>
+ * <li> Some error happens in the container and caused the container to
exit </li>
+ * <li> The container gets preempted by the ResourceManager </li>
+ * <li> The container gets killed due to some reason, for example, if it
runs over the allowed amount of virtual or physical memory </li>
+ * </ol>
+ * A replacement container is needed in all except the first case.
+ * </p>
+ */
+ @Override
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
+ ContainerId completedContainerId = containerStatus.getContainerId();
+ ContainerInfo completedContainerInfo =
this.containerMap.remove(completedContainerId);
+
+ // Because callbacks are processed asynchronously, we might encounter
situations where handleContainerCompletion()
+ // is called before onContainersAllocated(), resulting in the containerId
missing from the containersMap.
+ // We use removedContainerIds to remember these containers and remove them
from containerMap later
+ // when we call reviseWorkforcePlanAndRequestNewContainers method
+ if (completedContainerInfo == null) {
+ log.warn("Container {} not found in containerMap. This container
onContainersCompleted() likely called before onContainersAllocated()",
+ completedContainerId);
+ this.removedContainerIds.add(completedContainerId);
+ return;
+ }
+
+ log.info("Container {} running profile {} completed with exit status {}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getExitStatus());
+
+ if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
+ log.info("Container {} running profile {} completed with diagnostics:
{}",
+ completedContainerId, completedContainerInfo.getWorkerProfileName(),
containerStatus.getDiagnostics());
+ }
+
+ if (this.shutdownInProgress) {
+ log.info("Ignoring container completion for container {} as shutdown is
in progress", completedContainerId);
+ return;
+ }
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+
+ switch (containerStatus.getExitStatus()) {
+ case(ContainerExitStatus.ABORTED):
+ handleAbortedContainer(completedContainerId, completedContainerInfo);
+ break;
+ case(ContainerExitStatus.PREEMPTED):
+ log.info("Container {} for profile {} preempted, starting to launching
a replacement container",
+ completedContainerId,
completedContainerInfo.getWorkerProfileName());
+ requestContainersForWorkerProfile(workerProfile, 1);
+ break;
+ case(GENERAL_OOM_EXIT_STATUS_CODE):
+ case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
+ case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
+ handleContainerExitedWithOOM(completedContainerId,
completedContainerInfo);
+ break;
+ case(1): // Same as linux exit status 1 Often occurs when
launch_container.sh failed
Review Comment:
use constant `private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE =
1;`
Issue Time Tracking
-------------------
Worklog Id: (was: 955661)
Time Spent: 1h 50m (was: 1h 40m)
> Implement ContainerCompletion callback in DynamicScalingYarnService
> -------------------------------------------------------------------
>
> Key: GOBBLIN-2189
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2189
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-core
> Reporter: Vivek Rai
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> DynamicScalingYarnService currently doesn't handle scaling down containers
> and neither does anything if container is killed abruptly or goes OOM. So to
> handle this scenario containerCompletion callback should be implemented to
> launch the replacement containers and also scaling down handling should be
> done.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)