[ 
https://issues.apache.org/jira/browse/GOBBLIN-2189?focusedWorklogId=955993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955993
 ]

ASF GitHub Bot logged work on GOBBLIN-2189:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Feb/25 06:07
            Start Date: 07/Feb/25 06:07
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4092:
URL: https://github.com/apache/gobblin/pull/4092#discussion_r1945994951


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -42,18 +54,27 @@
  */
 @Slf4j
 public class DynamicScalingYarnService extends YarnService {
+  private static final String 
DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = 
"replacementWorkerProfile";
+  private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1;
+  private static final int GENERAL_OOM_EXIT_STATUS_CODE = 137;
+  private static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2;
+  private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 
64GB
 
   /** this holds the current count of containers already requested for each 
worker profile */
   private final WorkforceStaffing actualWorkforceStaffing;
   /** this holds the current total workforce plan as per latest received 
scaling directives */
   private final WorkforcePlan workforcePlan;
+  private final Set<ContainerId> removedContainerIds;
+  private final AtomicLong profileNameSuffixGenerator;
 
   public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
       YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
     super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
 
     this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
     this.workforcePlan = new WorkforcePlan(this.config, 
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+    this.removedContainerIds = ConcurrentHashMap.newKeySet();

Review Comment:
   update to use ConcurrentLinkedQueue, as `ConcurrentHashMap.newKeySet()` 
works better for membership checks but not for ordered or frequent removals.
   ```
   private final Queue<ContainerId> removedContainerIds;
   removedContainerIds = new ConcurrentLinkedQueue<>();
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 955993)
    Time Spent: 3h 20m  (was: 3h 10m)

> Implement ContainerCompletion callback in DynamicScalingYarnService
> -------------------------------------------------------------------
>
>                 Key: GOBBLIN-2189
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2189
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> DynamicScalingYarnService currently doesn't handle scaling down containers 
> and neither does anything if container is killed abruptly or goes OOM. So to 
> handle this scenario containerCompletion callback should be implemented to 
> launch the replacement containers and also scaling down handling should be 
> done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to