[GitHub] [hadoop] goiri commented on a diff in pull request #5169: YARN-11349. [Federation] Router Support DelegationToken With SQL.

2022-12-07 Thread GitBox


goiri commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1042397112


##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java:
##
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.sql.SQLException;
+
+/**
+ * RowCount Handler.
+ * Used to parse out the rowCount information of the output parameter.
+ */
+public class RowCountHandler implements ResultSetHandler {
+
+  private String rowCountParamName;
+
+  public RowCountHandler(String paramName) {
+this.rowCountParamName = paramName;
+  }
+
+  @Override
+  public Integer handle(Object... params) throws SQLException {
+Integer result = 0;
+for (int i = 0; i < params.length; i++) {

Review Comment:
   for (param : params)



##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/RowCountHandler.java:
##
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.sql.SQLException;
+
+/**
+ * RowCount Handler.
+ * Used to parse out the rowCount information of the output parameter.
+ */
+public class RowCountHandler implements ResultSetHandler {
+
+  private String rowCountParamName;
+
+  public RowCountHandler(String paramName) {
+this.rowCountParamName = paramName;
+  }
+
+  @Override
+  public Integer handle(Object... params) throws SQLException {
+Integer result = 0;
+for (int i = 0; i < params.length; i++) {
+  if (params[i] != null) {
+if (params[i] instanceof FederationSQLOutParameter) {

Review Comment:
   I think instanceof already checks for null, otherwise you can do an &&



##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/sql/FederationQueryRunner.java:
##
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.sql;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.CallableStatement;
+import java.util.Arrays;
+
+/**
+ * QueryRunner is used to execute stored procedure SQL and parse the returned 
results.
+ */
+p

[GitHub] [hadoop] goiri commented on a diff in pull request #5169: YARN-11349. [Federation] Router Support DelegationToken With SQL.

2022-12-06 Thread GitBox


goiri commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1041175764


##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/RowCountHandler.java:
##
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.sql.SQLException;
+
+/**
+ * RowCount Handler.
+ * Used to parse out the rowCount information of the output parameter.
+ */
+public class RowCountHandler implements ResultSetHandler {

Review Comment:
   This class is SQL specific right? we should have it in the name or in a sql 
package.



##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java:
##
@@ -558,38 +557,37 @@ public void 
testDeleteReservationHomeSubClusterAbnormalSituation() throws Except
 () -> stateStore.deleteReservationHomeSubCluster(delRequest));
   }
 
-  @Test(expected = NotImplementedException.class)
+  @Test
   public void testStoreNewMasterKey() throws Exception {

Review Comment:
   I think by default this gets executed right? It already has the Test 
annotation on the parent.
   We can get rid of this.
   (Check in the next run that this test actually runs.)



##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##
@@ -1353,45 +1384,454 @@ public Connection getConn() {
 return conn;
   }
 
+  /**
+   * SQLFederationStateStore Supports Store New MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey.
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest 
request)
   throws YarnException, IOException {
-throw new NotImplementedException("Code is not implemented");
+
+// Step1: Verify parameters to ensure that key fields are not empty.
+FederationRouterRMTokenInputValidator.validate(request);
+
+// Step2: Parse the parameters and serialize the DelegationKey as a string.
+DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+int keyId = delegationKey.getKeyId();
+String delegationKeyStr = 
FederationStateStoreUtils.encodeWritable(delegationKey);
+
+// Step3. store data in database.
+try {
+
+  FederationSQLOutParameter rowCountOUT =
+  new FederationSQLOutParameter<>("rowCount_OUT", 
java.sql.Types.INTEGER, Integer.class);
+
+  // Execute the query
+  long startTime = clock.getTime();
+  Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY, 
keyId,
+  delegationKeyStr, rowCountOUT);
+  long stopTime = clock.getTime();
+
+  // We hope that 1 record can be written to the database.
+  // If the number of records is not 1, it means that the data was written 
incorrectly.
+  if (rowCount != 1) {
+FederationStateStoreUtils.logAndThrowStoreException(LOG,
+"Wrong behavior during the insertion of masterKey, keyId = %s. " +
+"please check the records of the database.", 
String.valueOf(keyId));
+  }
+  FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+} catch (SQLException e) {
+  FederationStateStoreClientMetrics.failedStateStoreCall();
+  FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+  "Unable to insert the newly masterKey, keyId = %s.", 
String.valueOf(keyId));
+}
+
+// Step4. Query Data from the database and return the result.
+return getMasterKeyByDelegationKey(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * Defined the sp

[GitHub] [hadoop] goiri commented on a diff in pull request #5169: YARN-11349. [Federation] Router Support DelegationToken With SQL.

2022-11-29 Thread GitBox


goiri commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1035047482


##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java:
##
@@ -164,6 +170,40 @@ public void setRenewDate(Long renewDate) {
 this.builder.setRenewDate(renewDate);
   }
 
+  @Override
+  public byte[] toByteArray() throws IOException {
+return builder.build().toByteArray();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+builder.mergeFrom((DataInputStream) in);
+  }
+
+  @Override
+  public String getTokenInfo() {
+RouterStoreTokenProtoOrBuilder p = viaProto ? proto : builder;
+if (this.tokenInfo != null) {
+  return this.tokenInfo;
+}
+if (!p.hasTokenInfo()) {
+  return null;
+}
+this.tokenInfo = p.getTokenInfo();
+return this.tokenInfo;
+  }
+
+  @Override
+  protected void setTokenInfo(String tokenInfo) {
+maybeInitBuilder();
+if(tokenInfo == null) {

Review Comment:
   Space



##
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##
@@ -1353,45 +1386,560 @@ public Connection getConn() {
 return conn;
   }
 
+  /**
+   * SQLFederationStateStore Supports Store New MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey.
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest 
request)
   throws YarnException, IOException {
-throw new NotImplementedException("Code is not implemented");
+
+// Step1: Verify parameters to ensure that key fields are not empty.
+FederationRouterRMTokenInputValidator.validate(request);
+
+// Step2: Parse the parameters and serialize the DelegationKey as a string.
+DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+int keyId = delegationKey.getKeyId();
+String delegationKeyStr = encodeWritable(delegationKey);
+CallableStatement cstmt = null;
+
+// Step3. store data in database.
+try {
+  // Call procedure
+  cstmt = getCallableStatement(CALL_SP_ADD_MASTERKEY);
+
+  // Set the parameters for the stored procedure
+  // 1)IN keyId_IN bigint
+  cstmt.setInt("keyId_IN", keyId);
+  // 2)IN masterKey_IN varbinary(1024)
+  cstmt.setString("masterKey_IN", delegationKeyStr);
+  // 3) OUT rowCount_OUT int
+  cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+  long startTime = clock.getTime();
+  cstmt.executeUpdate();
+  long stopTime = clock.getTime();
+
+  // Get rowCount
+  int rowCount = cstmt.getInt("rowCount_OUT");
+
+  // We hope that 1 record can be written to the database.
+  // If the number of records is not 1, it means that the data was written 
incorrectly.
+  if (rowCount != 1) {
+FederationStateStoreUtils.logAndThrowStoreException(LOG,
+"Wrong behavior during the insertion of masterKey, keyId = %s. " +
+"please check the records of the database.", 
String.valueOf(keyId));
+  }
+  FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+} catch (SQLException e) {
+  FederationStateStoreClientMetrics.failedStateStoreCall();
+  FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+  "Unable to insert the newly masterKey, keyId = %s.", 
String.valueOf(keyId));
+} finally {
+  // Return to the pool the CallableStatement
+  FederationStateStoreUtils.returnToPool(LOG, cstmt);
+}
+
+// Step4. Query Data from the database and return the result.
+return getMasterKeyByDelegationKey(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest 
request)
   throws YarnException, IOException {
-throw new NotImplementedException("Code is not implemented");
+// Step1: Verify parameters to ensure that key fields are not empty.
+FederationRouterRMTokenInputValidator.validate(request);
+
+// Step2: Parse parameters and get KeyId.
+RouterMasterKey paramMasterKey = request.getRouterMasterKey();