[ 
https://issues.apache.org/jira/browse/YARN-11177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585617#comment-17585617
 ] 

ASF GitHub Bot commented on YARN-11177:
---------------------------------------

goiri commented on code in PR #4764:
URL: https://github.com/apache/hadoop/pull/4764#discussion_r956442103


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,124 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive = 
federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+        || request.getReservationDefinition() == null || request.getQueue() == 
null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);

Review Comment:
   indentation



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,124 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive = 
federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;

Review Comment:
   just declare in 909



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -1614,13 +1782,41 @@ protected SubClusterId getApplicationHomeSubCluster(
 
       } catch (Exception ex) {
         if(LOG.isDebugEnabled()){
-          LOG.debug("Can't Find ApplicationId = {} in Sub Cluster!", 
applicationId);
+          LOG.debug("Can't find applicationId = {} in Sub Cluster!", 
applicationId);
         }
       }
     }
 
     String errorMsg =
-        String.format("Can't Found applicationId = %s in any sub clusters", 
applicationId);
+        String.format("Can't find applicationId = %s in any sub clusters", 
applicationId);
+    throw new YarnException(errorMsg);
+  }
+
+  protected SubClusterId getReservationHomeSubCluster(ReservationId 
reservationId)
+      throws YarnException {
+
+    if (reservationId == null) {
+      LOG.error("ReservationId is Null, Can't find in SubCluster.");
+      return null;
+    }
+
+    SubClusterId resultSubClusterId = null;
+
+    // try looking for reservation in Home SubCluster
+    try {
+      resultSubClusterId = 
federationFacade.getReservationHomeSubCluster(reservationId);
+    } catch (YarnException ex) {
+      if(LOG.isDebugEnabled()){

Review Comment:
   Do we need the guard for this?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,124 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive = 
federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+        || request.getReservationDefinition() == null || request.getQueue() == 
null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the

Review Comment:
   This full logic looks overly complicated.





> Support getNewReservation, submitReservation, updateReservation, 
> deleteReservation API's for Federation
> -------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-11177
>                 URL: https://issues.apache.org/jira/browse/YARN-11177
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to