[ 
https://issues.apache.org/jira/browse/YARN-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643941#comment-17643941
 ] 

ASF GitHub Bot commented on YARN-11349:
---------------------------------------

goiri commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1041175764


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/RowCountHandler.java:
##########
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.sql.SQLException;
+
+/**
+ * RowCount Handler.
+ * Used to parse out the rowCount information of the output parameter.
+ */
+public class RowCountHandler implements ResultSetHandler<Integer> {

Review Comment:
   This class is SQL specific right? we should have it in the name or in a sql 
package.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java:
##########
@@ -558,38 +557,37 @@ public void 
testDeleteReservationHomeSubClusterAbnormalSituation() throws Except
         () -> stateStore.deleteReservationHomeSubCluster(delRequest));
   }
 
-  @Test(expected = NotImplementedException.class)
+  @Test
   public void testStoreNewMasterKey() throws Exception {

Review Comment:
   I think by default this gets executed right? It already has the Test 
annotation on the parent.
   We can get rid of this.
   (Check in the next run that this test actually runs.)



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1353,45 +1384,454 @@ public Connection getConn() {
     return conn;
   }
 
+  /**
+   * SQLFederationStateStore Supports Store New MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey.
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse the parameters and serialize the DelegationKey as a string.
+    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+    int keyId = delegationKey.getKeyId();
+    String delegationKeyStr = 
FederationStateStoreUtils.encodeWritable(delegationKey);
+
+    // Step3. store data in database.
+    try {
+
+      FederationSQLOutParameter<Integer> rowCountOUT =
+          new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY, 
keyId,
+          delegationKeyStr, rowCountOUT);
+      long stopTime = clock.getTime();
+
+      // We hope that 1 record can be written to the database.
+      // If the number of records is not 1, it means that the data was written 
incorrectly.
+      if (rowCount != 1) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during the insertion of masterKey, keyId = %s. " +
+            "please check the records of the database.", 
String.valueOf(keyId));
+      }
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly masterKey, keyId = %s.", 
String.valueOf(keyId));
+    }
+
+    // Step4. Query Data from the database and return the result.
+    return getMasterKeyByDelegationKey(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * Defined the sp_deleteMasterKey procedure.
+   * This procedure requires 1 input parameters, 1 output parameters.
+   * Input parameters
+   * 1. IN keyId_IN int
+   * Output parameters
+   * 2. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+    int paramKeyId = paramMasterKey.getKeyId();
+
+    // Step3. Clear data from database.
+    try {
+
+      // Execute the query
+      long startTime = clock.getTime();
+      FederationSQLOutParameter<Integer> rowCountOUT =
+          new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+      Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY,
+          paramKeyId, rowCountOUT);
+      long stopTime = clock.getTime();
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the keyId %s. " +
+            "The database is expected to delete 1 record, " +
+            "but the number of deleted records returned by the database is 
greater than 1, " +
+            "indicating that a duplicate masterKey occurred during the 
deletion process.",
+            paramKeyId);
+      }
+
+      LOG.info("Delete from the StateStore the keyId: {}.", paramKeyId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+      return RouterMasterKeyResponse.newInstance(paramMasterKey);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to delete the keyId %s.", paramKeyId);
+    }
+
+    throw new YarnException("Unable to delete the masterKey, keyId = " + 
paramKeyId);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * Defined the sp_getMasterKey procedure.
+   * this procedure requires 2 parameters.
+   * Input parameters:
+   * 1. IN keyId_IN int
+   * Output parameters:
+   * 2. OUT masterKey_OUT varchar(1024)
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+    int paramKeyId = paramMasterKey.getKeyId();
+
+    // Step3: Call the stored procedure to get the result.
+    try {
+
+      FederationQueryRunner runner = new FederationQueryRunner();
+      FederationSQLOutParameter<String> masterKeyOUT =
+          new FederationSQLOutParameter<>("masterKey_OUT", 
java.sql.Types.VARCHAR, String.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      RouterMasterKey routerMasterKey = runner.execute(
+          conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), 
paramKeyId, masterKeyOUT);
+      long stopTime = clock.getTime();
+
+      LOG.info("Got the information about the specified masterKey = {} 
according to keyId = {}.",
+          routerMasterKey, paramKeyId);
+
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+
+      // Return query result.
+      return RouterMasterKeyResponse.newInstance(routerMasterKey);
+
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the masterKey information according to %s.",
+          String.valueOf(paramKeyId));
+    }
+
+    // Throw exception information
+    throw new YarnException(
+        "Unable to obtain the masterKey information according to " + 
paramKeyId);
   }
 
+  /**
+   * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier.
+   *
+   * Defined the sp_addDelegationToken procedure.
+   * This procedure requires 4 input parameters, 1 output parameters.
+   * Input parameters:
+   * 1. IN sequenceNum_IN int
+   * 2. IN tokenIdent_IN varchar(1024)
+   * 3. IN token_IN varchar(1024)
+   * 4. IN renewDate_IN bigint
+   * Output parameters:
+   * 5. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2. store data in database.
+    try {
+      long duration = addOrUpdateToken(request, true);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      throw new YarnException(e);
+    }
+
+    // Step3. Query Data from the database and return the result.
+    return getTokenByRouterStoreToken(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Update RMDelegationTokenIdentifier.
+   *
+   * Defined the sp_updateDelegationToken procedure.
+   * This procedure requires 4 input parameters, 1 output parameters.
+   * Input parameters:
+   * 1. IN sequenceNum_IN int
+   * 2. IN tokenIdent_IN varchar(1024)
+   * 3. IN token_IN varchar(1024)
+   * 4. IN renewDate_IN bigint
+   * Output parameters:
+   * 5. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2. update data in database.
+    try {
+      long duration = addOrUpdateToken(request, false);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      throw new YarnException(e);
+    }
+
+    // Step3. Query Data from the database and return the result.
+    return getTokenByRouterStoreToken(request);
+  }
+
+  /**
+   * Add Or Update RMDelegationTokenIdentifier.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @param isAdd   true, addData; false, updateData.
+   * @return method operation time.
+   * @throws IOException   An IO Error occurred.
+   * @throws SQLException  An SQL Error occurred.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   */
+  private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd)
+      throws IOException, SQLException, YarnException {
+
+    // Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    String tokenIdentifier = 
FederationStateStoreUtils.encodeWritable(identifier);
+    String tokenInfo = routerStoreToken.getTokenInfo();
+    long renewDate = routerStoreToken.getRenewDate();
+    int sequenceNum = identifier.getSequenceNumber();
+
+    FederationQueryRunner runner = new FederationQueryRunner();
+    FederationSQLOutParameter<Integer> rowCountOUT =
+        new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+    // Execute the query
+    long startTime = clock.getTime();
+    String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN : 
CALL_SP_UPDATE_DELEGATIONTOKEN;
+    Integer rowCount = runner.execute(conn, procedure, new 
RowCountHandler("rowCount_OUT"),
+        sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT);
+    long stopTime = clock.getTime();
+
+    // Get rowCount
+    // In the process of updating the code, rowCount may be 0 or 1;
+    // if rowCount=1, it is as expected, indicating that we have updated the 
Token correctly;
+    // if rowCount=0, it is not as expected,
+    // indicating that we have not updated the Token correctly.
+    if (rowCount != 1) {
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Wrong behavior during the insertion of delegationToken, tokenId = 
%s. " +
+          "Please check the records of the database.", 
String.valueOf(sequenceNum));
+    }
+
+    // return execution time
+    return (stopTime - startTime);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove RMDelegationTokenIdentifier.
+   *
+   * Defined the sp_deleteDelegationToken procedure.
+   * This procedure requires 1 input parameters, 1 output parameters.
+   * Input parameters:
+   * 1. IN sequenceNum_IN bigint
+   * Output parameters:
+   * 2. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    int sequenceNum = identifier.getSequenceNumber();
+
+    try {
+
+      FederationSQLOutParameter<Integer> rowCountOUT =
+          new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      Integer rowCount = 
getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN,
+          sequenceNum, rowCountOUT);
+      long stopTime = clock.getTime();
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "TokenId %s does not exist", String.valueOf(sequenceNum));
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the delegationToken %s. " +
+            "The database is expected to delete 1 record, " +
+            "but the number of deleted records returned by the database is 
greater than 1, " +
+            "indicating that a duplicate tokenId occurred during the deletion 
process.",
+            String.valueOf(sequenceNum));
+      }
+
+      LOG.info("Delete from the StateStore the delegationToken, tokenId = 
{}.", sequenceNum);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+      return RouterRMTokenResponse.newInstance(routerStoreToken);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to delete the delegationToken, tokenId = %s.", sequenceNum);
+    }
+    throw new YarnException("Unable to delete the delegationToken, tokenId = " 
+ sequenceNum);
   }
 
+  /**
+   * The Router Supports GetTokenByRouterStoreToken.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return RouterRMTokenResponse.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    int sequenceNum = identifier.getSequenceNumber();
+
+    try {
+      FederationQueryRunner runner = new FederationQueryRunner();
+      FederationSQLOutParameter<String> tokenIdentOUT =
+          new FederationSQLOutParameter<>("tokenIdent_OUT", 
java.sql.Types.VARCHAR, String.class);
+      FederationSQLOutParameter<String> tokenOUT =
+          new FederationSQLOutParameter<>("token_OUT", java.sql.Types.VARCHAR, 
String.class);
+      FederationSQLOutParameter<Long> renewDateOUT =
+          new FederationSQLOutParameter<>("renewDate_OUT", 
java.sql.Types.BIGINT, Long.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      RouterStoreToken resultToken = runner.execute(conn, 
CALL_SP_GET_DELEGATIONTOKEN,
+          new RouterStoreTokenHandler(), sequenceNum, tokenIdentOUT, tokenOUT, 
renewDateOUT);
+      long stopTime = clock.getTime();
+
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+      return RouterRMTokenResponse.newInstance(resultToken);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to get the delegationToken, tokenId = %s.", 
String.valueOf(sequenceNum));
+    }
+
+    // Throw exception information
+    throw new YarnException("Unable to get the delegationToken, tokenId = " + 
sequenceNum);
+  }
+
+  /**
+   * Convert MasterKey to DelegationKey.
+   *
+   * Before using this function,
+   * please use FederationRouterRMTokenInputValidator to verify the request.
+   * By default, the request is not empty, and the internal object is not 
empty.
+   *
+   * @param request RouterMasterKeyRequest
+   * @return DelegationKey.
+   */
+  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest 
request) {
+    RouterMasterKey masterKey = request.getRouterMasterKey();
+    return convertMasterKeyToDelegationKey(masterKey);
+  }
+
+  /**
+   * Convert MasterKey to DelegationKey.
+   *
+   * @param masterKey masterKey.
+   * @return DelegationKey.
+   */
+  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey 
masterKey) {

Review Comment:
   static?





> [Federation] Router Support DelegationToken With SQL
> ----------------------------------------------------
>
>                 Key: YARN-11349
>                 URL: https://issues.apache.org/jira/browse/YARN-11349
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation, router
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>
> Router Support DelegationToken With SQLFederationStateStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to