slfan1989 commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r961227793


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1037,342 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+
+      // Defined the sp_addReservationHomeSubCluster procedure
+      // this procedure requires 4 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // 2)IN homeSubCluster_IN varchar(256)
+      // Output parameters
+      // 3)OUT storedHomeSubCluster_OUT varchar(256)
+      // 4)OUT rowCount_OUT int
+
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)IN homeSubCluster_IN varchar(256)
+      cstmt.setString("homeSubCluster_IN", subClusterId.getId());
+      // 3) OUT storedHomeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("storedHomeSubCluster_OUT", 
java.sql.Types.VARCHAR);
+      // 4) OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = 
cstmt.getString("storedHomeSubCluster_OUT");
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt("rowCount_OUT");
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s according 
to reservation %s. " +
+              "The database expects to insert 1 record, but the number of " +
+              "inserted changes is greater than 1, " +
+              "please check the records of the database.",
+              subClusterId, reservationId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+
+      // Defined the sp_getReservationHomeSubCluster procedure
+      // this procedure requires 2 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // Output parameters
+      // 2)OUT homeSubCluster_OUT varchar(256)
+
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)OUT homeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =

Review Comment:
   Thank you very much for helping to review the code, I will modify the code.  
In the case of an exception,  the exception should be thrown directly, so the 
handling should be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to