[
https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=955983&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955983
]
ASF GitHub Bot logged work on GOBBLIN-2189:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Feb/25 04:38
Start Date: 07/Feb/25 04:38
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1945905660
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws
IOException, Interrup
/** Note : this test uses {@link DummyScalingDirectiveSource}*/
@Test
public void testWithDummyScalingDirectiveSource() throws IOException,
InterruptedException {
- // DummyScalingDirectiveSource returns 2 scaling directives in first 3
invocations and after that it returns empty list
- // so the total number of invocations after three invocations should
always be 3
+ // DummyScalingDirectiveSource returns 2 scaling directives in first 5
invocations and after that it returns empty list
+ // so the total number of invocations after three invocations should
always be 5
TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
mockGobblinTemporalApplicationMaster, new
DummyScalingDirectiveSource());
testDynamicScalingYarnServiceManager.startUp();
- Thread.sleep(5000); // 5 seconds sleep so that
GetScalingDirectivesRunnable.run() is called for 5 times
+ Thread.sleep(7000); // 5 seconds sleep so that
GetScalingDirectivesRunnable.run() is called for 7 times
testDynamicScalingYarnServiceManager.shutDown();
- Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Review Comment:
what exactly are we testing here?
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws
IOException, Interrup
/** Note : this test uses {@link DummyScalingDirectiveSource}*/
@Test
public void testWithDummyScalingDirectiveSource() throws IOException,
InterruptedException {
- // DummyScalingDirectiveSource returns 2 scaling directives in first 3
invocations and after that it returns empty list
- // so the total number of invocations after three invocations should
always be 3
+ // DummyScalingDirectiveSource returns 2 scaling directives in first 5
invocations and after that it returns empty list
+ // so the total number of invocations after three invocations should
always be 5
TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
mockGobblinTemporalApplicationMaster, new
DummyScalingDirectiveSource());
testDynamicScalingYarnServiceManager.startUp();
- Thread.sleep(5000); // 5 seconds sleep so that
GetScalingDirectivesRunnable.run() is called for 5 times
+ Thread.sleep(7000); // 5 seconds sleep so that
GetScalingDirectivesRunnable.run() is called for 7 times
Review Comment:
any reason to update this to 7 seconds?
typo in comments `5 seconds sleep`
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws
IOException, Interrup
/** Note : this test uses {@link DummyScalingDirectiveSource}*/
@Test
public void testWithDummyScalingDirectiveSource() throws IOException,
InterruptedException {
- // DummyScalingDirectiveSource returns 2 scaling directives in first 3
invocations and after that it returns empty list
- // so the total number of invocations after three invocations should
always be 3
+ // DummyScalingDirectiveSource returns 2 scaling directives in first 5
invocations and after that it returns empty list
+ // so the total number of invocations after three invocations should
always be 5
Review Comment:
`after five invocations` ?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -71,29 +164,103 @@ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
+
+ // Correct the actualWorkforceStaffing in case of
handleContainerCompletion() getting called before onContainersAllocated()
+ Iterator<ContainerId> iterator = removedContainerIds.iterator();
+ while (iterator.hasNext()) {
+ ContainerId containerId = iterator.next();
+ ContainerInfo containerInfo = this.containerMap.remove(containerId);
+ if (containerInfo != null) {
+ WorkerProfile workerProfile = containerInfo.getWorkerProfile();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1,
+ System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate
setPoints for the workerProfile,
+ // otherwise extra containers will be requested when calculating
deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+ iterator.remove();
+ }
+ }
+
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}
private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
- if (profileDelta.getDelta() > 0) { // scale up!
- WorkerProfile workerProfile = profileDelta.getProfile();
- String profileName = workerProfile.getName();
- int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
- int delta = profileDelta.getDelta();
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int delta = profileDelta.getDelta();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+ if (delta > 0) { // scale up!
log.info("Requesting {} new containers for profile {} having currently
{} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName,
currNumContainers + delta, System.currentTimeMillis());
- } else if (profileDelta.getDelta() < 0) { // scale down!
- // TODO: Decide how to handle negative deltas
- log.warn("Handling of Negative delta is not supported yet : Profile {}
delta {} ",
- profileDelta.getProfile().getName(), profileDelta.getDelta());
+ } else if (delta < 0) { // scale down!
+ log.info("Releasing {} containers for profile {} having currently {}
containers", -delta,
+ WorkforceProfiles.renderName(profileName), currNumContainers);
+ releaseContainersForWorkerProfile(profileName, delta);
+ // update our staffing after releasing containers
+ int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
+ this.actualWorkforceStaffing.reviseStaffing(profileName,
numContainersAfterRelease, System.currentTimeMillis());
} // else, already at staffing plan (or at least have requested, so
in-progress)
});
}
+ private void handleAbortedContainer(ContainerId completedContainerId,
ContainerInfo completedContainerInfo) {
+ // Case 1 : Container release requested while scaling down
+ if (this.releasedContainerCache.getIfPresent(completedContainerId) !=
null) {
+ log.info("Container {} was released while downscaling for profile {}",
completedContainerId, completedContainerInfo.getWorkerProfileName());
+ this.releasedContainerCache.invalidate(completedContainerId);
+ return;
+ }
+
+ // Case 2 : Container release was not requested, we need to request a
replacement container
+ log.info("Container {} aborted for profile {}, starting to launch a
replacement container", completedContainerId,
completedContainerInfo.getWorkerProfileName());
+
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
+ }
+
+ private synchronized void handleContainerExitedWithOOM(ContainerId
completedContainerId, ContainerInfo completedContainerInfo) {
+ log.info("Container {} for profile {} exited with OOM, starting to launch
a replacement container",
+ completedContainerId, completedContainerInfo.getWorkerProfileName());
+
+ List<ScalingDirective> scalingDirectives = new ArrayList<>();
+
+ WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
+ // Update the current staffing to reflect the container that exited with
OOM
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
+ if (currNumContainers > 0) {
+ this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis());
+ // Add a scaling directive so that workforcePlan have uptodate setPoints
for the workerProfile,
+ // otherwise extra containers will be requested when calculating deltas
+ scalingDirectives.add(new ScalingDirective(workerProfile.getName(),
currNumContainers - 1, System.currentTimeMillis()));
+ }
+
+ // Request a replacement container
+ int currContainerMemoryMbs =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ int newContainerMemoryMbs = currContainerMemoryMbs *
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
+ if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS &&
newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+ newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS;
+ } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
+ log.warn("Expected replacement container memory exceeds the maximum
allowed memory {}. Not requesting a replacement container.",
+ MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
+ return;
Review Comment:
this can be simplified to:
```
int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 2,
MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
log.warn("Replacement container memory exceeds max limit {}, not
requesting a replacement container.", MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
return;
}
```
Issue Time Tracking
-------------------
Worklog Id: (was: 955983)
Time Spent: 2h 20m (was: 2h 10m)
> Implement ContainerCompletion callback in DynamicScalingYarnService
> -------------------------------------------------------------------
>
> Key: GOBBLIN-2189
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2189
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-core
> Reporter: Vivek Rai
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> DynamicScalingYarnService currently doesn't handle scaling down containers
> and neither does anything if container is killed abruptly or goes OOM. So to
> handle this scenario containerCompletion callback should be implemented to
> launch the replacement containers and also scaling down handling should be
> done.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)