[ 
https://issues.apache.org/jira/browse/YARN-11177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17584517#comment-17584517
 ] 

ASF GitHub Bot commented on YARN-11177:
---------------------------------------

goiri commented on code in PR #4764:
URL: https://github.com/apache/hadoop/pull/4764#discussion_r954396015


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -925,13 +1041,61 @@ public ReservationListResponse listReservations(
   @Override
   public ReservationUpdateResponse updateReservation(
       ReservationUpdateRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null) {

Review Comment:
   Indentation



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null || 
request.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);
+        }
+      } else {
+        try {
+          // update the mapping of reservationId and the home subClusterId to
+          // the new subClusterId we have selected
+          
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+        } catch (YarnException e) {
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getReservationHomeSubCluster(reservationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Reservation {} already submitted on SubCluster {}.",
+                reservationId, subClusterId);
+          } else {
+            routerMetrics.incrSubmitReservationFailedRetrieved();
+            RouterServerUtil.logAndThrowException(e,
+                "Unable to update the ReservationId %s into the 
FederationStateStore.",
+                     reservationId);

Review Comment:
   Indentation



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null || 
request.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);
+        }
+      } else {
+        try {
+          // update the mapping of reservationId and the home subClusterId to
+          // the new subClusterId we have selected
+          
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+        } catch (YarnException e) {
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getReservationHomeSubCluster(reservationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Reservation {} already submitted on SubCluster {}.",
+                reservationId, subClusterId);
+          } else {
+            routerMetrics.incrSubmitReservationFailedRetrieved();
+            RouterServerUtil.logAndThrowException(e,
+                "Unable to update the ReservationId %s into the 
FederationStateStore.",
+                     reservationId);
+          }
+        }
+      }
+
+      // Obtain the ApplicationClientProtocol of the corresponding RM 
according to the subClusterId,
+      // and call the submitReservation method, If the request is responded to,
+      // If the request is responded, it will return directly, otherwise 
retryCount+1,
+      // continue to submit other request.
+      try {
+        ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+        ReservationSubmissionResponse response = 
clientRMProxy.submitReservation(request);
+        if (response != null) {
+          LOG.info("Reservation {} submitted on {}.", 
request.getReservationId(), subClusterId);
+          long stopTime = clock.getTime();
+          routerMetrics.succeededSubmitReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to submit the reservation {} to SubCluster {} error = 
{}.",
+            reservationId, subClusterId.getId(), e.getMessage(), e);
+      }
+
+      retryCount++;
+    }
+
+    routerMetrics.incrSubmitReservationFailedRetrieved();
+    String msg = String.format("Reservation %s failed to be submitted.",
+        request.getReservationId());

Review Comment:
   reservationId



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =

Review Comment:
   Single line?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
         NodeAttributeType.STRING, "nvida");
     Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
   }
+
+  @Test
+  public void testGetNewReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNewReservation request.", () -> 
interceptor.getNewReservation(null));
+
+    // normal request
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    ReservationId reservationId = response.getReservationId();
+    Assert.assertNotNull(reservationId);
+    Assert.assertTrue(reservationId.toString().contains("reservation"));
+    Assert.assertEquals(reservationId.getClusterTimestamp(), 
ResourceManager.getClusterTimeStamp());
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+    Assert.assertTrue(subClusters.contains(subClusterId));
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request 
empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(
+        ReservationSubmissionRequest.newInstance(null, null, null)));

Review Comment:
   indentation is not correct



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null || 
request.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);
+        }
+      } else {
+        try {
+          // update the mapping of reservationId and the home subClusterId to
+          // the new subClusterId we have selected
+          
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+        } catch (YarnException e) {
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getReservationHomeSubCluster(reservationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Reservation {} already submitted on SubCluster {}.",
+                reservationId, subClusterId);
+          } else {
+            routerMetrics.incrSubmitReservationFailedRetrieved();
+            RouterServerUtil.logAndThrowException(e,
+                "Unable to update the ReservationId %s into the 
FederationStateStore.",
+                     reservationId);
+          }
+        }
+      }
+
+      // Obtain the ApplicationClientProtocol of the corresponding RM 
according to the subClusterId,
+      // and call the submitReservation method, If the request is responded to,
+      // If the request is responded, it will return directly, otherwise 
retryCount+1,
+      // continue to submit other request.
+      try {
+        ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+        ReservationSubmissionResponse response = 
clientRMProxy.submitReservation(request);
+        if (response != null) {
+          LOG.info("Reservation {} submitted on {}.", 
request.getReservationId(), subClusterId);

Review Comment:
   reservationId is extracted already



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
         NodeAttributeType.STRING, "nvida");
     Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
   }
+
+  @Test
+  public void testGetNewReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNewReservation request.", () -> 
interceptor.getNewReservation(null));
+
+    // normal request
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    ReservationId reservationId = response.getReservationId();
+    Assert.assertNotNull(reservationId);
+    Assert.assertTrue(reservationId.toString().contains("reservation"));
+    Assert.assertEquals(reservationId.getClusterTimestamp(), 
ResourceManager.getClusterTimeStamp());
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+    Assert.assertTrue(subClusters.contains(subClusterId));
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request 
empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(
+        ReservationSubmissionRequest.newInstance(null, null, null)));
+
+    // null request3
+    ReservationSubmissionRequest request3 =
+        ReservationSubmissionRequest.newInstance(null, "q1", null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request3));
+
+    // null request4
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+    ReservationSubmissionRequest request4 =
+        ReservationSubmissionRequest.newInstance(null, null,  reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request4));
+
+    // null request5
+    long defaultDuration = 600000;
+    long arrival = Time.now();
+    long deadline = arrival + (int)(defaultDuration * 1.1);
+
+    ReservationRequest rRequest = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 1, 1, defaultDuration);
+    ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
+    ReservationDefinition rDefinition = createReservationDefinition(arrival, 
deadline, rRequests,
+        ReservationRequestInterpreter.R_ALL, "u1");
+    ReservationSubmissionRequest request5 =
+        ReservationSubmissionRequest.newInstance(rDefinition, null,  
reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request5));
+  }
+
+  @Test
+  public void testSubmitReservationMultipleSubmission() throws Exception {
+    LOG.info("Test FederationClientInterceptor: Submit Reservation - 
Multiple");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // First Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId1 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId1);
+    Assert.assertTrue(subClusters.contains(subClusterId1));
+
+    // First Retry
+    ReservationSubmissionResponse submissionResponse1 =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse1);
+    SubClusterId subClusterId2 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId2);
+    Assert.assertEquals(subClusterId1, subClusterId2);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Update Reservation
+    ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
+    ReservationUpdateRequest updateRequest =
+        ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
+    ReservationUpdateResponse updateResponse =
+        interceptor.updateReservation(updateRequest);
+    Assert.assertNotNull(updateResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Delete Reservation
+    ReservationDeleteRequest deleteRequest = 
ReservationDeleteRequest.newInstance(reservationId);
+    ReservationDeleteResponse deleteResponse = 
interceptor.deleteReservation(deleteRequest);
+    Assert.assertNotNull(deleteResponse);
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "Reservation " + reservationId + " does not exist",
+        () -> stateStoreUtil.queryReservationHomeSC(reservationId));
+  }
+
+
+  private ReservationDefinition createReservationDefinition(int memory, int 
core) {
+    // get reservationId
+    long defaultDuration = 600000;

Review Comment:
   constant final etc



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
         NodeAttributeType.STRING, "nvida");
     Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
   }
+
+  @Test
+  public void testGetNewReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNewReservation request.", () -> 
interceptor.getNewReservation(null));
+
+    // normal request
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    ReservationId reservationId = response.getReservationId();
+    Assert.assertNotNull(reservationId);
+    Assert.assertTrue(reservationId.toString().contains("reservation"));
+    Assert.assertEquals(reservationId.getClusterTimestamp(), 
ResourceManager.getClusterTimeStamp());
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+    Assert.assertTrue(subClusters.contains(subClusterId));
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request 
empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(
+        ReservationSubmissionRequest.newInstance(null, null, null)));
+
+    // null request3
+    ReservationSubmissionRequest request3 =
+        ReservationSubmissionRequest.newInstance(null, "q1", null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request3));
+
+    // null request4
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+    ReservationSubmissionRequest request4 =
+        ReservationSubmissionRequest.newInstance(null, null,  reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request4));
+
+    // null request5
+    long defaultDuration = 600000;
+    long arrival = Time.now();
+    long deadline = arrival + (int)(defaultDuration * 1.1);
+
+    ReservationRequest rRequest = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 1, 1, defaultDuration);
+    ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
+    ReservationDefinition rDefinition = createReservationDefinition(arrival, 
deadline, rRequests,
+        ReservationRequestInterpreter.R_ALL, "u1");
+    ReservationSubmissionRequest request5 =
+        ReservationSubmissionRequest.newInstance(rDefinition, null,  
reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request5));
+  }
+
+  @Test
+  public void testSubmitReservationMultipleSubmission() throws Exception {
+    LOG.info("Test FederationClientInterceptor: Submit Reservation - 
Multiple");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // First Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId1 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId1);
+    Assert.assertTrue(subClusters.contains(subClusterId1));
+
+    // First Retry
+    ReservationSubmissionResponse submissionResponse1 =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse1);
+    SubClusterId subClusterId2 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId2);
+    Assert.assertEquals(subClusterId1, subClusterId2);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Update Reservation
+    ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
+    ReservationUpdateRequest updateRequest =
+        ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
+    ReservationUpdateResponse updateResponse =
+        interceptor.updateReservation(updateRequest);
+    Assert.assertNotNull(updateResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Delete Reservation
+    ReservationDeleteRequest deleteRequest = 
ReservationDeleteRequest.newInstance(reservationId);
+    ReservationDeleteResponse deleteResponse = 
interceptor.deleteReservation(deleteRequest);
+    Assert.assertNotNull(deleteResponse);
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "Reservation " + reservationId + " does not exist",
+        () -> stateStoreUtil.queryReservationHomeSC(reservationId));
+  }
+
+
+  private ReservationDefinition createReservationDefinition(int memory, int 
core) {
+    // get reservationId
+    long defaultDuration = 600000;

Review Comment:
   10 * 60 * 1000





> Support getNewReservation, submitReservation, updateReservation, 
> deleteReservation API's for Federation
> -------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-11177
>                 URL: https://issues.apache.org/jira/browse/YARN-11177
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to