arpit09 commented on code in PR #3932:
URL: https://github.com/apache/gobblin/pull/3932#discussion_r1575621084


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String 
getInuseParticipantForHelixPartition(JobContext jobContext, int p
       return null;
     }
 
+
+    private String getParticipantInInitStateForHelixPartition(JobContext 
jobContext, int partition) {
+      if 
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {

Review Comment:
   Let's the swap the check to avoid NPE, 
   `TaskPartitionState.INIT.equals(jobContext.getPartitionState(partition))`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);
+            // get containerInfo of the helix participant
+            YarnService.ContainerInfo containerInfo = 
yarnService.getContainerInfoGivenHelixParticipant(participant);
+            if(containerInfo != null) {
+              containersToRelease.add(containerInfo.getContainer());
+              instanceInitStateSince.remove(participant);
+              inUseInstances.remove(participant);
+            } else {
+              log.warn("ContainerInfo for participant {} is not found", 
participant);
+            }
+          }
+        } else {
+          instanceInitStateSince.remove(participant);
+        }
+      }
+
+      // release the containers
+      if(!containersToRelease.isEmpty()) {
+        this.yarnService.getEventBus().post(new 
ContainerReleaseRequest(containersToRelease, true));
       }
+
       slidingWindowReservoir.add(yarnContainerRequestBundle);
 
+
       log.debug("There are {} containers being requested in total, tag-count 
map {}, tag-resource map {}",
           yarnContainerRequestBundle.getTotalContainers(), 
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
           yarnContainerRequestBundle.getHelixTagResourceMap());
 
       
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
 inUseInstances);
     }
 
-    @VisibleForTesting
     /**
      * Return true is the condition for tagging an instance as "unused" holds.
      * The condition, by default is that if an instance went back to
      * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
      * not tag that instance as "unused" and have that as the candidate for 
scaling down.
      */
+    @VisibleForTesting
     boolean isInstanceUnused(String participant){
       return System.currentTimeMillis() - instanceIdleSince.get(participant) >
           TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
     }
+
+    /**
+     * Return true is the condition for tagging an instance as stuck in INIT 
state holds.
+     * The condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link 
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+     * not tag that instance as stuck and the container will not be scaled 
down.
+     */
+    @VisibleForTesting
+    boolean isInstanceStuckInInitState(String participant) {
+      return System.currentTimeMillis() - 
instanceInitStateSince.get(participant) >
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);

Review Comment:
   how did we decide on this variable `maxIdleTimeInMinutesBeforeScalingDown` 
to wait for these many minutes before scaling up ? Can we add a separate 
variable for this case, and avoid using the one for Scaling down as the purpose 
for both of them are different 



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -259,6 +278,11 @@ void runInternal() {
                 .map(i -> getInuseParticipantForHelixPartition(jobContext, i))
                 .filter(Objects::nonNull).collect(Collectors.toSet()));
 
+            instancesInInitState.addAll(jobContext.getPartitionSet().stream()
+                .map(i -> 
getParticipantInInitStateForHelixPartition(jobContext, i))

Review Comment:
   nit: Can say `helixPartition` instead using just `i` variable



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest 
newContainerRequest) {
         newContainerRequest.getResource());
   }
 
+  /**
+   *  getContainerInfoGivenHelixParticipant returns the containerInfo of the 
given helixParticipant if it exists else
+   *  return null
+   * @param helixParticipant
+   * @return ContainerInfo
+   */
+  public ContainerInfo getContainerInfoGivenHelixParticipant(String 
helixParticipant) {
+    for (ContainerInfo containerInfo : this.containerMap.values()) {
+      if (containerInfo.getHelixParticipantId().equals(helixParticipant)) {
+        return containerInfo;
+      }
+    }
+    return null;

Review Comment:
   Can see if using optional will help here, avoiding null checks and NPE in 
future



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -313,6 +313,21 @@ public void handleNewContainerRequest(NewContainerRequest 
newContainerRequest) {
         newContainerRequest.getResource());
   }
 
+  /**
+   *  getContainerInfoGivenHelixParticipant returns the containerInfo of the 
given helixParticipant if it exists else
+   *  return null
+   * @param helixParticipant
+   * @return ContainerInfo
+   */

Review Comment:
   Please use this formatter for apache gobblin - 
https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);

Review Comment:
   Let's print the time for which the instance is stuck in init state



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String 
getInuseParticipantForHelixPartition(JobContext jobContext, int p
       return null;
     }
 
+
+    private String getParticipantInInitStateForHelixPartition(JobContext 
jobContext, int partition) {
+      if 
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
+        log.info("Helix task {} is in {} state",
+            jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionState(partition));
+        return jobContext.getAssignedParticipant(partition);
+      }
+
+      return null;
+    }
+
     /**
      * Iterate through the workflows configured in Helix to figure out the 
number of required partitions
      * and request the {@link YarnService} to scale to the desired number of 
containers.
      */
     @VisibleForTesting
     void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
+      Set<String> instancesInInitState = new HashSet<>();

Review Comment:
   Let's add a comment, that instancesInInitState contains only those 
containers where task is assigned, not all the helix containers in init state. 
We can change the variable name to signify that.



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time
+            log.info("Instance {} is stuck in INIT state for a long time, 
releasing the container", participant);
+            // get containerInfo of the helix participant
+            YarnService.ContainerInfo containerInfo = 
yarnService.getContainerInfoGivenHelixParticipant(participant);
+            if(containerInfo != null) {
+              containersToRelease.add(containerInfo.getContainer());
+              instanceInitStateSince.remove(participant);
+              inUseInstances.remove(participant);
+            } else {
+              log.warn("ContainerInfo for participant {} is not found", 
participant);

Review Comment:
   If the participant info is not found, shall we remove it from 
`instanceInitStateSince` as well? When this will get removed from 
`instanceInitStateSince` for this case ? 



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java:
##########
@@ -21,26 +21,33 @@
 
 import org.apache.hadoop.yarn.api.records.Container;
 
+import lombok.Getter;
+
 
 /**
  * A type of event for container release requests to be used with a {@link 
com.google.common.eventbus.EventBus}.
  * This event is different than {@link ContainerShutdownRequest} because it 
releases the container through
  * the Resource Manager, while {@link ContainerShutdownRequest} shuts down a 
container through the
  * Node Manager
  */
+@Getter
 public class ContainerReleaseRequest {
+  /**
+   * -- GETTER --
+   *  Get the IDs of the containers to release.
+   *
+   * @return the IDs of the containers to release
+   */
   private final Collection<Container> containers;
+  private final boolean spinUpReplacementContainers;

Review Comment:
   For boolean variables please start variable name with `is` or `has` or 
`should`
   Please add a comment for what purpose is it needed ? 



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());

Review Comment:
   Did we check if we can get the init time directly from jobContext or helix 
directly? If possible we should utilize that time rather adding it ourselves.



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
           // Remove this instance if existed in the tracking map.
           instanceIdleSince.remove(participant);
         }
+
+        if(instancesInInitState.contains(participant)) {
+          instanceInitStateSince.putIfAbsent(participant, 
System.currentTimeMillis());
+          if (!isInstanceStuckInInitState(participant)) {
+            // release the corresponding container as the helix task is stuck 
in INIT state for a long time

Review Comment:
   `is not stuck` right ?



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -219,13 +226,25 @@ private String 
getInuseParticipantForHelixPartition(JobContext jobContext, int p
       return null;
     }
 
+
+    private String getParticipantInInitStateForHelixPartition(JobContext 
jobContext, int partition) {
+      if 
(jobContext.getPartitionState(partition).equals(TaskPartitionState.INIT)) {
+        log.info("Helix task {} is in {} state",
+            jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionState(partition));
+        return jobContext.getAssignedParticipant(partition);
+      }
+
+      return null;

Review Comment:
   Can we return Optional here instead of specifically sending `null` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to