[ https://issues.apache.org/jira/browse/YARN-11177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17583809#comment-17583809 ]
ASF GitHub Bot commented on YARN-11177: --------------------------------------- goiri commented on code in PR #4764: URL: https://github.com/apache/hadoop/pull/4764#discussion_r953004392 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,120 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + ReservationSubmissionResponse response = null; Review Comment: Declare this where we use it. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,120 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + ReservationSubmissionResponse response = null; + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + for (int i = 0; i < numSubmitRetries; ++i) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, i, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + if (i == 0) { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + subClusterId = federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster); + } catch (YarnException e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + String message = + String.format("Unable to insert the ReservationId %s into the FederationStateStore.", + reservationId); + RouterServerUtil.logAndThrowException(message, e); + } + } else { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster); + } catch (YarnException e) { + String message = + String.format("Unable to update the ReservationId %s into the FederationStateStore.", Review Comment: We had some utility for the log and throw that did some of this message formatting ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,120 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + ReservationSubmissionResponse response = null; + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + for (int i = 0; i < numSubmitRetries; ++i) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, i, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + if (i == 0) { Review Comment: There has to be a cleaner way to do this ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,120 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + + if (response != null) { + long stopTime = clock.getTime(); Review Comment: Can we do this inside of the try? > Support getNewReservation, submitReservation, updateReservation, > deleteReservation API's for Federation > ------------------------------------------------------------------------------------------------------- > > Key: YARN-11177 > URL: https://issues.apache.org/jira/browse/YARN-11177 > Project: Hadoop YARN > Issue Type: Sub-task > Reporter: fanshilun > Assignee: fanshilun > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org