[ https://issues.apache.org/jira/browse/YARN-11177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17584517#comment-17584517 ]
ASF GitHub Bot commented on YARN-11177: --------------------------------------- goiri commented on code in PR #4764: URL: https://github.com/apache/hadoop/pull/4764#discussion_r954396015 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -925,13 +1041,61 @@ public ReservationListResponse listReservations( @Override public ReservationUpdateResponse updateReservation( ReservationUpdateRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null) { Review Comment: Indentation ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + long retryCount = 0; + boolean firstRetry = true; + + while (retryCount < numSubmitRetries) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, retryCount, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + // If it is the first attempt,use StateStore to add the + // mapping of reservationId and subClusterId. + // if the number of attempts is greater than 1, use StateStore to update the mapping. + if (firstRetry) { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + subClusterId = federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster); + firstRetry = false; + } catch (YarnException e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", + reservationId); + } + } else { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", + reservationId, subClusterId); + } else { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to update the ReservationId %s into the FederationStateStore.", + reservationId); Review Comment: Indentation ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + long retryCount = 0; + boolean firstRetry = true; + + while (retryCount < numSubmitRetries) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, retryCount, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + // If it is the first attempt,use StateStore to add the + // mapping of reservationId and subClusterId. + // if the number of attempts is greater than 1, use StateStore to update the mapping. + if (firstRetry) { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + subClusterId = federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster); + firstRetry = false; + } catch (YarnException e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", + reservationId); + } + } else { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", + reservationId, subClusterId); + } else { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to update the ReservationId %s into the FederationStateStore.", + reservationId); + } + } + } + + // Obtain the ApplicationClientProtocol of the corresponding RM according to the subClusterId, + // and call the submitReservation method, If the request is responded to, + // If the request is responded, it will return directly, otherwise retryCount+1, + // continue to submit other request. + try { + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + ReservationSubmissionResponse response = clientRMProxy.submitReservation(request); + if (response != null) { + LOG.info("Reservation {} submitted on {}.", request.getReservationId(), subClusterId); + long stopTime = clock.getTime(); + routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to submit the reservation {} to SubCluster {} error = {}.", + reservationId, subClusterId.getId(), e.getMessage(), e); + } + + retryCount++; + } + + routerMetrics.incrSubmitReservationFailedRetrieved(); + String msg = String.format("Reservation %s failed to be submitted.", + request.getReservationId()); Review Comment: reservationId ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = Review Comment: Single line? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java: ########## @@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception { NodeAttributeType.STRING, "nvida"); Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu)); } + + @Test + public void testGetNewReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : Get NewReservation request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, + "Missing getNewReservation request.", () -> interceptor.getNewReservation(null)); + + // normal request + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + ReservationId reservationId = response.getReservationId(); + Assert.assertNotNull(reservationId); + Assert.assertTrue(reservationId.toString().contains("reservation")); + Assert.assertEquals(reservationId.getClusterTimestamp(), ResourceManager.getClusterTimeStamp()); + } + + @Test + public void testSubmitReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : SubmitReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId); + Assert.assertTrue(subClusters.contains(subClusterId)); + } + + @Test + public void testSubmitReservationEmptyRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : SubmitReservation request empty."); + + // null request1 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(null)); + + // null request2 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation( + ReservationSubmissionRequest.newInstance(null, null, null))); Review Comment: indentation is not correct ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java: ########## @@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Missing getNewReservation request."; + RouterServerUtil.logAndThrowException(errMsg, null); + } + + long startTime = clock.getTime(); + Map<SubClusterId, SubClusterInfo> subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + GetNewReservationResponse response = null; + try { + response = clientRMProxy.getNewReservation(request); + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); + } + } + + routerMetrics.incrGetNewReservationFailedRetrieved(); + String errMsg = "Failed to create a new reservation."; + throw new YarnException(errMsg); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getReservationId() == null + || request.getReservationDefinition() == null || request.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing submitReservation request or reservationId " + + "or reservation definition or queue.", null); + } + + long startTime = clock.getTime(); + ReservationId reservationId = request.getReservationId(); + + long retryCount = 0; + boolean firstRetry = true; + + while (retryCount < numSubmitRetries) { + + SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request); + LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.", + reservationId, retryCount, subClusterId); + + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + + // If it is the first attempt,use StateStore to add the + // mapping of reservationId and subClusterId. + // if the number of attempts is greater than 1, use StateStore to update the mapping. + if (firstRetry) { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + subClusterId = federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster); + firstRetry = false; + } catch (YarnException e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to insert the ReservationId %s into the FederationStateStore.", + reservationId); + } + } else { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = + federationFacade.getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", + reservationId, subClusterId); + } else { + routerMetrics.incrSubmitReservationFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to update the ReservationId %s into the FederationStateStore.", + reservationId); + } + } + } + + // Obtain the ApplicationClientProtocol of the corresponding RM according to the subClusterId, + // and call the submitReservation method, If the request is responded to, + // If the request is responded, it will return directly, otherwise retryCount+1, + // continue to submit other request. + try { + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + ReservationSubmissionResponse response = clientRMProxy.submitReservation(request); + if (response != null) { + LOG.info("Reservation {} submitted on {}.", request.getReservationId(), subClusterId); Review Comment: reservationId is extracted already ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java: ########## @@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception { NodeAttributeType.STRING, "nvida"); Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu)); } + + @Test + public void testGetNewReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : Get NewReservation request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, + "Missing getNewReservation request.", () -> interceptor.getNewReservation(null)); + + // normal request + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + ReservationId reservationId = response.getReservationId(); + Assert.assertNotNull(reservationId); + Assert.assertTrue(reservationId.toString().contains("reservation")); + Assert.assertEquals(reservationId.getClusterTimestamp(), ResourceManager.getClusterTimeStamp()); + } + + @Test + public void testSubmitReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : SubmitReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId); + Assert.assertTrue(subClusters.contains(subClusterId)); + } + + @Test + public void testSubmitReservationEmptyRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : SubmitReservation request empty."); + + // null request1 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(null)); + + // null request2 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation( + ReservationSubmissionRequest.newInstance(null, null, null))); + + // null request3 + ReservationSubmissionRequest request3 = + ReservationSubmissionRequest.newInstance(null, "q1", null); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(request3)); + + // null request4 + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + ReservationSubmissionRequest request4 = + ReservationSubmissionRequest.newInstance(null, null, reservationId); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(request4)); + + // null request5 + long defaultDuration = 600000; + long arrival = Time.now(); + long deadline = arrival + (int)(defaultDuration * 1.1); + + ReservationRequest rRequest = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 1, 1, defaultDuration); + ReservationRequest[] rRequests = new ReservationRequest[] {rRequest}; + ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests, + ReservationRequestInterpreter.R_ALL, "u1"); + ReservationSubmissionRequest request5 = + ReservationSubmissionRequest.newInstance(rDefinition, null, reservationId); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(request5)); + } + + @Test + public void testSubmitReservationMultipleSubmission() throws Exception { + LOG.info("Test FederationClientInterceptor: Submit Reservation - Multiple"); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // First Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + SubClusterId subClusterId1 = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId1); + Assert.assertTrue(subClusters.contains(subClusterId1)); + + // First Retry + ReservationSubmissionResponse submissionResponse1 = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse1); + SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId2); + Assert.assertEquals(subClusterId1, subClusterId2); + } + + @Test + public void testUpdateReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : UpdateReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + // Update Reservation + ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1); + ReservationUpdateRequest updateRequest = + ReservationUpdateRequest.newInstance(rDefinition2, reservationId); + ReservationUpdateResponse updateResponse = + interceptor.updateReservation(updateRequest); + Assert.assertNotNull(updateResponse); + + SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId); + } + + @Test + public void testDeleteReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : DeleteReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + // Delete Reservation + ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance(reservationId); + ReservationDeleteResponse deleteResponse = interceptor.deleteReservation(deleteRequest); + Assert.assertNotNull(deleteResponse); + + LambdaTestUtils.intercept(YarnException.class, + "Reservation " + reservationId + " does not exist", + () -> stateStoreUtil.queryReservationHomeSC(reservationId)); + } + + + private ReservationDefinition createReservationDefinition(int memory, int core) { + // get reservationId + long defaultDuration = 600000; Review Comment: constant final etc ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java: ########## @@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception { NodeAttributeType.STRING, "nvida"); Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu)); } + + @Test + public void testGetNewReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : Get NewReservation request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, + "Missing getNewReservation request.", () -> interceptor.getNewReservation(null)); + + // normal request + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + ReservationId reservationId = response.getReservationId(); + Assert.assertNotNull(reservationId); + Assert.assertTrue(reservationId.toString().contains("reservation")); + Assert.assertEquals(reservationId.getClusterTimestamp(), ResourceManager.getClusterTimeStamp()); + } + + @Test + public void testSubmitReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : SubmitReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId); + Assert.assertTrue(subClusters.contains(subClusterId)); + } + + @Test + public void testSubmitReservationEmptyRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : SubmitReservation request empty."); + + // null request1 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(null)); + + // null request2 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation( + ReservationSubmissionRequest.newInstance(null, null, null))); + + // null request3 + ReservationSubmissionRequest request3 = + ReservationSubmissionRequest.newInstance(null, "q1", null); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(request3)); + + // null request4 + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + ReservationSubmissionRequest request4 = + ReservationSubmissionRequest.newInstance(null, null, reservationId); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(request4)); + + // null request5 + long defaultDuration = 600000; + long arrival = Time.now(); + long deadline = arrival + (int)(defaultDuration * 1.1); + + ReservationRequest rRequest = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 1, 1, defaultDuration); + ReservationRequest[] rRequests = new ReservationRequest[] {rRequest}; + ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests, + ReservationRequestInterpreter.R_ALL, "u1"); + ReservationSubmissionRequest request5 = + ReservationSubmissionRequest.newInstance(rDefinition, null, reservationId); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitReservation request or reservationId or reservation definition or queue.", + () -> interceptor.submitReservation(request5)); + } + + @Test + public void testSubmitReservationMultipleSubmission() throws Exception { + LOG.info("Test FederationClientInterceptor: Submit Reservation - Multiple"); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // First Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + SubClusterId subClusterId1 = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId1); + Assert.assertTrue(subClusters.contains(subClusterId1)); + + // First Retry + ReservationSubmissionResponse submissionResponse1 = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse1); + SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId2); + Assert.assertEquals(subClusterId1, subClusterId2); + } + + @Test + public void testUpdateReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : UpdateReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + // Update Reservation + ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1); + ReservationUpdateRequest updateRequest = + ReservationUpdateRequest.newInstance(rDefinition2, reservationId); + ReservationUpdateResponse updateResponse = + interceptor.updateReservation(updateRequest); + Assert.assertNotNull(updateResponse); + + SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); + Assert.assertNotNull(subClusterId); + } + + @Test + public void testDeleteReservation() throws Exception { + LOG.info("Test FederationClientInterceptor : DeleteReservation request."); + + // get new reservationId + GetNewReservationRequest request = GetNewReservationRequest.newInstance(); + GetNewReservationResponse response = interceptor.getNewReservation(request); + Assert.assertNotNull(response); + + // allow plan follower to synchronize, manually trigger an assignment + Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); + for (MockRM mockRM : mockRMs.values()) { + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan("root.decided", true); + } + + // Submit Reservation + ReservationId reservationId = response.getReservationId(); + ReservationDefinition rDefinition = createReservationDefinition(1024, 1); + ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( + rDefinition, "decided", reservationId); + + ReservationSubmissionResponse submissionResponse = + interceptor.submitReservation(rSubmissionRequest); + Assert.assertNotNull(submissionResponse); + + // Delete Reservation + ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance(reservationId); + ReservationDeleteResponse deleteResponse = interceptor.deleteReservation(deleteRequest); + Assert.assertNotNull(deleteResponse); + + LambdaTestUtils.intercept(YarnException.class, + "Reservation " + reservationId + " does not exist", + () -> stateStoreUtil.queryReservationHomeSC(reservationId)); + } + + + private ReservationDefinition createReservationDefinition(int memory, int core) { + // get reservationId + long defaultDuration = 600000; Review Comment: 10 * 60 * 1000 > Support getNewReservation, submitReservation, updateReservation, > deleteReservation API's for Federation > ------------------------------------------------------------------------------------------------------- > > Key: YARN-11177 > URL: https://issues.apache.org/jira/browse/YARN-11177 > Project: Hadoop YARN > Issue Type: Sub-task > Reporter: fanshilun > Assignee: fanshilun > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org