pradeepppc commented on code in PR #3932:
URL: https://github.com/apache/gobblin/pull/3932#discussion_r1576581493
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java:
##########
@@ -299,27 +325,65 @@ void runInternal() {
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
+
+ if(instancesInInitState.contains(participant)) {
+ instanceInitStateSince.putIfAbsent(participant,
System.currentTimeMillis());
+ if (!isInstanceStuckInInitState(participant)) {
+ // release the corresponding container as the helix task is stuck
in INIT state for a long time
+ log.info("Instance {} is stuck in INIT state for a long time,
releasing the container", participant);
+ // get containerInfo of the helix participant
+ YarnService.ContainerInfo containerInfo =
yarnService.getContainerInfoGivenHelixParticipant(participant);
+ if(containerInfo != null) {
+ containersToRelease.add(containerInfo.getContainer());
+ instanceInitStateSince.remove(participant);
+ inUseInstances.remove(participant);
+ } else {
+ log.warn("ContainerInfo for participant {} is not found",
participant);
+ }
+ }
+ } else {
+ instanceInitStateSince.remove(participant);
+ }
+ }
+
+ // release the containers
+ if(!containersToRelease.isEmpty()) {
+ this.yarnService.getEventBus().post(new
ContainerReleaseRequest(containersToRelease, true));
}
+
slidingWindowReservoir.add(yarnContainerRequestBundle);
+
log.debug("There are {} containers being requested in total, tag-count
map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(),
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
inUseInstances);
}
- @VisibleForTesting
/**
* Return true is the condition for tagging an instance as "unused" holds.
* The condition, by default is that if an instance went back to
* active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
* not tag that instance as "unused" and have that as the candidate for
scaling down.
*/
+ @VisibleForTesting
boolean isInstanceUnused(String participant){
return System.currentTimeMillis() - instanceIdleSince.get(participant) >
TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
}
+
+ /**
+ * Return true is the condition for tagging an instance as stuck in INIT
state holds.
+ * The condition, by default is that if an instance went back to
+ * active (having partition running on it) within {@link
#maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+ * not tag that instance as stuck and the container will not be scaled
down.
+ */
+ @VisibleForTesting
+ boolean isInstanceStuckInInitState(String participant) {
+ return System.currentTimeMillis() -
instanceInitStateSince.get(participant) >
+ TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
Review Comment:
added new variable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]