goiri commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r961180010


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java:
##########
@@ -76,38 +107,479 @@ public void testSqlConnectionsCreatedCount() throws 
YarnException {
         FederationStateStoreClientMetrics.getNumConnections());
   }
 
-  @Test(expected = NotImplementedException.class)
   public void testAddReservationHomeSubCluster() throws Exception {

Review Comment:
   Can we do this in this PR?
   We can check the unit tests results for all this.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1037,342 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+
+      // Defined the sp_addReservationHomeSubCluster procedure
+      // this procedure requires 4 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // 2)IN homeSubCluster_IN varchar(256)
+      // Output parameters
+      // 3)OUT storedHomeSubCluster_OUT varchar(256)
+      // 4)OUT rowCount_OUT int
+
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)IN homeSubCluster_IN varchar(256)
+      cstmt.setString("homeSubCluster_IN", subClusterId.getId());
+      // 3) OUT storedHomeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("storedHomeSubCluster_OUT", 
java.sql.Types.VARCHAR);
+      // 4) OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = 
cstmt.getString("storedHomeSubCluster_OUT");
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt("rowCount_OUT");
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s according 
to reservation %s. " +
+              "The database expects to insert 1 record, but the number of " +
+              "inserted changes is greater than 1, " +
+              "please check the records of the database.",
+              subClusterId, reservationId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+
+      // Defined the sp_getReservationHomeSubCluster procedure
+      // this procedure requires 2 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // Output parameters
+      // 2)OUT homeSubCluster_OUT varchar(256)
+
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)OUT homeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+    return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
   }
 
   @Override
   public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
       GetReservationsHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    CallableStatement cstmt = null;
+    ResultSet rs = null;
+    List<ReservationHomeSubCluster> reservationsHomeSubClusters = new 
ArrayList<>();
+
+    try {
+
+      // Defined the sp_getReservationsHomeSubCluster procedure
+      // This procedure requires no input parameters, but will have 2 output 
parameters
+      // Output parameters
+      // 1)OUT reservationId
+      // 2)OUT homeSubCluster
+
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
+
+      while (rs.next()) {

Review Comment:
   Not very familiar with this API; the next() already moves everything?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1037,342 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+
+      // Defined the sp_addReservationHomeSubCluster procedure
+      // this procedure requires 4 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // 2)IN homeSubCluster_IN varchar(256)
+      // Output parameters
+      // 3)OUT storedHomeSubCluster_OUT varchar(256)
+      // 4)OUT rowCount_OUT int
+
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)IN homeSubCluster_IN varchar(256)
+      cstmt.setString("homeSubCluster_IN", subClusterId.getId());
+      // 3) OUT storedHomeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("storedHomeSubCluster_OUT", 
java.sql.Types.VARCHAR);
+      // 4) OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = 
cstmt.getString("storedHomeSubCluster_OUT");
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt("rowCount_OUT");
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s according 
to reservation %s. " +
+              "The database expects to insert 1 record, but the number of " +
+              "inserted changes is greater than 1, " +
+              "please check the records of the database.",
+              subClusterId, reservationId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+
+      // Defined the sp_getReservationHomeSubCluster procedure
+      // this procedure requires 2 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // Output parameters
+      // 2)OUT homeSubCluster_OUT varchar(256)
+
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)OUT homeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =

Review Comment:
   If we get an exception we return null values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to