[ 
https://issues.apache.org/jira/browse/YARN-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644403#comment-17644403
 ] 

ASF GitHub Bot commented on YARN-11349:
---------------------------------------

goiri commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1042397112


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java:
##########
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.sql.SQLException;
+
+/**
+ * RowCount Handler.
+ * Used to parse out the rowCount information of the output parameter.
+ */
+public class RowCountHandler implements ResultSetHandler<Integer> {
+
+  private String rowCountParamName;
+
+  public RowCountHandler(String paramName) {
+    this.rowCountParamName = paramName;
+  }
+
+  @Override
+  public Integer handle(Object... params) throws SQLException {
+    Integer result = 0;
+    for (int i = 0; i < params.length; i++) {

Review Comment:
   for (param : params)



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java:
##########
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.sql.SQLException;
+
+/**
+ * RowCount Handler.
+ * Used to parse out the rowCount information of the output parameter.
+ */
+public class RowCountHandler implements ResultSetHandler<Integer> {
+
+  private String rowCountParamName;
+
+  public RowCountHandler(String paramName) {
+    this.rowCountParamName = paramName;
+  }
+
+  @Override
+  public Integer handle(Object... params) throws SQLException {
+    Integer result = 0;
+    for (int i = 0; i < params.length; i++) {
+      if (params[i] != null) {
+        if (params[i] instanceof FederationSQLOutParameter) {

Review Comment:
   I think instanceof already checks for null, otherwise you can do an &&



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java:
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.CallableStatement;
+import java.util.Arrays;
+
+/**
+ * QueryRunner is used to execute stored procedure SQL and parse the returned 
results.
+ */
+public class FederationQueryRunner {
+
+  /**
+   * Execute Stored Procedure SQL.
+   *
+   * @param conn      Database Connection.
+   * @param procedure Stored Procedure SQL.
+   * @param rsh       Result Set handler.
+   * @param params    List of stored procedure parameters.
+   * @param <T>       Generic T.
+   * @return Stored Procedure Result Set.
+   * @throws SQLException An exception occurred when calling a stored 
procedure.
+   */
+  public <T> T execute(Connection conn, String procedure, ResultSetHandler<T> 
rsh, Object... params)
+      throws SQLException {
+    if (conn == null) {
+      throw new SQLException("Null connection");
+    }
+
+    if (procedure == null) {
+      throw new SQLException("Null Procedure SQL statement");
+    }
+
+    if (rsh == null) {
+      throw new SQLException("Null ResultSetHandler");
+    }
+
+    CallableStatement stmt = null;
+    T results = null;
+
+    try {
+      stmt = this.getCallableStatement(conn, procedure);
+      this.fillStatement(stmt, params);
+      stmt.executeUpdate();
+      this.retrieveOutParameters(stmt, params);
+      results = rsh.handle(params);
+    } catch (SQLException e) {
+      this.rethrow(e, procedure, params);
+    } finally {
+      close(stmt);
+    }
+    return results;
+  }
+
+  /**
+   * Get CallableStatement from Conn.
+   *
+   * @param conn Database Connection.
+   * @param procedure Stored Procedure SQL.
+   * @return CallableStatement.
+   * @throws SQLException An exception occurred when calling a stored 
procedure.
+   */
+  @VisibleForTesting
+  protected CallableStatement getCallableStatement(Connection conn, String 
procedure)
+      throws SQLException {
+    return conn.prepareCall(procedure);
+  }
+
+  /**
+   * Set Statement parameters.
+   *
+   * @param stmt CallableStatement.
+   * @param params Stored procedure parameters.
+   * @throws SQLException An exception occurred when calling a stored 
procedure.
+   */
+  public void fillStatement(CallableStatement stmt, Object... params)
+      throws SQLException {
+    for (int i = 0; i < params.length; i++) {
+      if (params[i] != null) {
+        if (stmt != null) {
+          if (params[i] instanceof FederationSQLOutParameter) {
+            FederationSQLOutParameter sqlOutParameter = 
(FederationSQLOutParameter) params[i];
+            sqlOutParameter.register(stmt, i + 1);
+          } else {
+            stmt.setObject(i + 1, params[i]);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Close Statement.
+   *
+   * @param stmt CallableStatement.
+   * @throws SQLException An exception occurred when calling a stored 
procedure.
+   */
+  public void close(Statement stmt) throws SQLException {
+    if (stmt != null) {
+      stmt.close();

Review Comment:
   set it to null?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1353,45 +1386,426 @@ public Connection getConn() {
     return conn;
   }
 
+  /**
+   * SQLFederationStateStore Supports Store New MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey.
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse the parameters and serialize the DelegationKey as a string.
+    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+    int keyId = delegationKey.getKeyId();
+    String delegationKeyStr = 
FederationStateStoreUtils.encodeWritable(delegationKey);
+
+    // Step3. store data in database.
+    try {
+
+      FederationSQLOutParameter<Integer> rowCountOUT =
+          new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY, 
keyId,
+          delegationKeyStr, rowCountOUT);
+      long stopTime = clock.getTime();
+
+      // We hope that 1 record can be written to the database.
+      // If the number of records is not 1, it means that the data was written 
incorrectly.
+      if (rowCount != 1) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during the insertion of masterKey, keyId = %s. " +
+            "please check the records of the database.", 
String.valueOf(keyId));
+      }
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly masterKey, keyId = %s.", 
String.valueOf(keyId));
+    }
+
+    // Step4. Query Data from the database and return the result.
+    return getMasterKeyByDelegationKey(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * Defined the sp_deleteMasterKey procedure.
+   * This procedure requires 1 input parameters, 1 output parameters.
+   * Input parameters
+   * 1. IN keyId_IN int
+   * Output parameters
+   * 2. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+    int paramKeyId = paramMasterKey.getKeyId();
+
+    // Step3. Clear data from database.
+    try {
+
+      // Execute the query
+      long startTime = clock.getTime();
+      FederationSQLOutParameter<Integer> rowCountOUT =
+          new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+      Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY,
+          paramKeyId, rowCountOUT);
+      long stopTime = clock.getTime();
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the keyId %s. " +
+            "The database is expected to delete 1 record, " +
+            "but the number of deleted records returned by the database is 
greater than 1, " +
+            "indicating that a duplicate masterKey occurred during the 
deletion process.",
+            paramKeyId);
+      }
+
+      LOG.info("Delete from the StateStore the keyId: {}.", paramKeyId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+      return RouterMasterKeyResponse.newInstance(paramMasterKey);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to delete the keyId %s.", paramKeyId);
+    }
+
+    throw new YarnException("Unable to delete the masterKey, keyId = " + 
paramKeyId);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * Defined the sp_getMasterKey procedure.
+   * this procedure requires 2 parameters.
+   * Input parameters:
+   * 1. IN keyId_IN int
+   * Output parameters:
+   * 2. OUT masterKey_OUT varchar(1024)
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+    int paramKeyId = paramMasterKey.getKeyId();
+
+    // Step3: Call the stored procedure to get the result.
+    try {
+
+      FederationQueryRunner runner = new FederationQueryRunner();
+      FederationSQLOutParameter<String> masterKeyOUT =
+          new FederationSQLOutParameter<>("masterKey_OUT", 
java.sql.Types.VARCHAR, String.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      RouterMasterKey routerMasterKey = runner.execute(
+          conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), 
paramKeyId, masterKeyOUT);
+      long stopTime = clock.getTime();
+
+      LOG.info("Got the information about the specified masterKey = {} 
according to keyId = {}.",
+          routerMasterKey, paramKeyId);
+
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+
+      // Return query result.
+      return RouterMasterKeyResponse.newInstance(routerMasterKey);
+
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the masterKey information according to %s.",
+          String.valueOf(paramKeyId));
+    }
+
+    // Throw exception information
+    throw new YarnException(
+        "Unable to obtain the masterKey information according to " + 
paramKeyId);
   }
 
+  /**
+   * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier.
+   *
+   * Defined the sp_addDelegationToken procedure.
+   * This procedure requires 4 input parameters, 1 output parameters.
+   * Input parameters:
+   * 1. IN sequenceNum_IN int
+   * 2. IN tokenIdent_IN varchar(1024)
+   * 3. IN token_IN varchar(1024)
+   * 4. IN renewDate_IN bigint
+   * Output parameters:
+   * 5. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2. store data in database.
+    try {
+      long duration = addOrUpdateToken(request, true);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      throw new YarnException(e);
+    }
+
+    // Step3. Query Data from the database and return the result.
+    return getTokenByRouterStoreToken(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Update RMDelegationTokenIdentifier.
+   *
+   * Defined the sp_updateDelegationToken procedure.
+   * This procedure requires 4 input parameters, 1 output parameters.
+   * Input parameters:
+   * 1. IN sequenceNum_IN int
+   * 2. IN tokenIdent_IN varchar(1024)
+   * 3. IN token_IN varchar(1024)
+   * 4. IN renewDate_IN bigint
+   * Output parameters:
+   * 5. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2. update data in database.
+    try {
+      long duration = addOrUpdateToken(request, false);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      throw new YarnException(e);
+    }
+
+    // Step3. Query Data from the database and return the result.
+    return getTokenByRouterStoreToken(request);
   }
 
+  /**
+   * Add Or Update RMDelegationTokenIdentifier.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @param isAdd   true, addData; false, updateData.
+   * @return method operation time.
+   * @throws IOException   An IO Error occurred.
+   * @throws SQLException  An SQL Error occurred.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   */
+  private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd)
+      throws IOException, SQLException, YarnException {
+
+    // Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    String tokenIdentifier = 
FederationStateStoreUtils.encodeWritable(identifier);
+    String tokenInfo = routerStoreToken.getTokenInfo();
+    long renewDate = routerStoreToken.getRenewDate();
+    int sequenceNum = identifier.getSequenceNumber();
+
+    FederationQueryRunner runner = new FederationQueryRunner();
+    FederationSQLOutParameter<Integer> rowCountOUT =
+        new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+    // Execute the query
+    long startTime = clock.getTime();
+    String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN : 
CALL_SP_UPDATE_DELEGATIONTOKEN;
+    Integer rowCount = runner.execute(conn, procedure, new 
RowCountHandler("rowCount_OUT"),
+        sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT);
+    long stopTime = clock.getTime();
+
+    // Get rowCount
+    // In the process of updating the code, rowCount may be 0 or 1;
+    // if rowCount=1, it is as expected, indicating that we have updated the 
Token correctly;
+    // if rowCount=0, it is not as expected,
+    // indicating that we have not updated the Token correctly.
+    if (rowCount != 1) {
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Wrong behavior during the insertion of delegationToken, tokenId = 
%s. " +
+          "Please check the records of the database.", 
String.valueOf(sequenceNum));
+    }
+
+    // return execution time
+    return (stopTime - startTime);
+  }
+
+  /**
+   * SQLFederationStateStore Supports Remove RMDelegationTokenIdentifier.
+   *
+   * Defined the sp_deleteDelegationToken procedure.
+   * This procedure requires 1 input parameters, 1 output parameters.
+   * Input parameters:
+   * 1. IN sequenceNum_IN bigint
+   * Output parameters:
+   * 2. OUT rowCount_OUT int
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    int sequenceNum = identifier.getSequenceNumber();
+
+    try {
+
+      FederationSQLOutParameter<Integer> rowCountOUT =
+          new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      Integer rowCount = 
getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN,
+          sequenceNum, rowCountOUT);
+      long stopTime = clock.getTime();
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "TokenId %s does not exist", String.valueOf(sequenceNum));
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the delegationToken %s. " +
+            "The database is expected to delete 1 record, " +
+            "but the number of deleted records returned by the database is 
greater than 1, " +
+            "indicating that a duplicate tokenId occurred during the deletion 
process.",
+            String.valueOf(sequenceNum));
+      }
+
+      LOG.info("Delete from the StateStore the delegationToken, tokenId = 
{}.", sequenceNum);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+      return RouterRMTokenResponse.newInstance(routerStoreToken);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to delete the delegationToken, tokenId = %s.", sequenceNum);
+    }
+    throw new YarnException("Unable to delete the delegationToken, tokenId = " 
+ sequenceNum);
   }
 
+  /**
+   * The Router Supports GetTokenByRouterStoreToken.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return RouterRMTokenResponse.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    int sequenceNum = identifier.getSequenceNumber();
+
+    try {
+      FederationQueryRunner runner = new FederationQueryRunner();
+      FederationSQLOutParameter<String> tokenIdentOUT =
+          new FederationSQLOutParameter<>("tokenIdent_OUT", 
java.sql.Types.VARCHAR, String.class);

Review Comment:
   statically import `java.sql.Types`



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RouterMasterKeyHandler.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
+import 
org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+
+/**
+ * RouterMasterKey Handler.
+ * Used to parse the result information of the output parameter into the 
RouterMasterKey type.
+ */
+public class RouterMasterKeyHandler implements 
ResultSetHandler<RouterMasterKey> {
+
+  private final static String MASTERKEY_OUT = "masterKey_OUT";
+
+  @Override
+  public RouterMasterKey handle(Object... params) throws SQLException {
+    RouterMasterKey routerMasterKey = Records.newRecord(RouterMasterKey.class);
+    for (int i = 0; i < params.length; i++) {

Review Comment:
   for (param : params)





> [Federation] Router Support DelegationToken With SQL
> ----------------------------------------------------
>
>                 Key: YARN-11349
>                 URL: https://issues.apache.org/jira/browse/YARN-11349
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation, router
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>
> Router Support DelegationToken With SQLFederationStateStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to