[ https://issues.apache.org/jira/browse/YARN-11177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585617#comment-17585617 ]
ASF GitHub Bot commented on YARN-11177: --------------------------------------- goiri commented on code in PR #4764: URL: https://github.com/apache/hadoop/pull/4764#discussion_r956442103 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,124 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + long retryCount = 0; + boolean firstRetry = true; + + while (retryCount < numSubmitRetries) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, retryCount, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + // If it is the first attempt,use StateStore to add the + // mapping of reservationId and subClusterId. + // if the number of attempts is greater than 1, use StateStore to update the mapping. + if (firstRetry) { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + subClusterId = federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster); + firstRetry = false; + } catch (YarnException e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", + reservationId); Review Comment: indentation ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,124 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; Review Comment: just declare in 909 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -1614,13 +1782,41 @@ protected SubClusterId getApplicationHomeSubCluster( } catch (Exception ex) { if(LOG.isDebugEnabled()){ - LOG.debug("Can't Find ApplicationId = {} in Sub Cluster!", applicationId); + LOG.debug("Can't find applicationId = {} in Sub Cluster!", applicationId); } } } String errorMsg = - String.format("Can't Found applicationId = %s in any sub clusters", applicationId); + String.format("Can't find applicationId = %s in any sub clusters", applicationId); + throw new YarnException(errorMsg); + } + + protected SubClusterId getReservationHomeSubCluster(ReservationId reservationId) + throws YarnException { + + if (reservationId == null) { + LOG.error("ReservationId is Null, Can't find in SubCluster."); + return null; + } + + SubClusterId resultSubClusterId = null; + + // try looking for reservation in Home SubCluster + try { + resultSubClusterId = federationFacade.getReservationHomeSubCluster(reservationId); + } catch (YarnException ex) { + if(LOG.isDebugEnabled()){ Review Comment: Do we need the guard for this? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,124 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + long retryCount = 0; + boolean firstRetry = true; + + while (retryCount < numSubmitRetries) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, retryCount, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + // If it is the first attempt,use StateStore to add the Review Comment: This full logic looks overly complicated. > Support getNewReservation, submitReservation, updateReservation, > deleteReservation API's for Federation > ------------------------------------------------------------------------------------------------------- > > Key: YARN-11177 > URL: https://issues.apache.org/jira/browse/YARN-11177 > Project: Hadoop YARN > Issue Type: Sub-task > Reporter: fanshilun > Assignee: fanshilun > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org