[ https://issues.apache.org/jira/browse/YARN-11350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641605#comment-17641605 ]
ASF GitHub Bot commented on YARN-11350: --------------------------------------- goiri commented on code in PR #5131: URL: https://github.com/apache/hadoop/pull/5131#discussion_r1036592621 ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java: ########## @@ -886,45 +1000,608 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( return UpdateReservationHomeSubClusterResponse.newInstance(); } + /** + * ZookeeperFederationStateStore Supports Store NewMasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + // Parse the delegationKey from the request and get the ZK storage path. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodeCreatePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(), + nodeCreatePath); + + // Write master key data to zk. + try(ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os)) { + delegationKey.write(fsOut); + put(nodeCreatePath, os.toByteArray(), false); + } + + // Get the stored masterKey from zk. + RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath); + long end = clock.getTime(); + opDurations.addStoreNewMasterKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(masterKeyFromZK); } + /** + * ZookeeperFederationStateStore Supports Remove MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + try { + // Parse the delegationKey from the request and get the ZK storage path. + RouterMasterKey masterKey = request.getRouterMasterKey(); + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodeRemovePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(), + nodeRemovePath); + + // Check if the path exists, Throws an exception if the path does not exist. + if (!exists(nodeRemovePath)) { + throw new YarnException("ZkNodePath = " + nodeRemovePath + " not exists!"); + } + + // try to remove masterKey. + zkManager.delete(nodeRemovePath); + long end = clock.getTime(); + opDurations.removeStoredMasterKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(masterKey); + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Remove MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // Parse the delegationKey from the request and get the ZK storage path. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + + // Check if the path exists, Throws an exception if the path does not exist. + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Get the stored masterKey from zk. + RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath); + long end = clock.getTime(); + opDurations.getMasterKeyByDelegationKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(routerMasterKey); + } catch (Exception e) { + throw new YarnException(e); + } + } + + /** + * Get MasterKeyZNodePath based on DelegationKey. + * + * @param delegationKey delegationKey. + * @return masterKey ZNodePath. + */ + private String getMasterKeyZNodePathByDelegationKey(DelegationKey delegationKey) { + return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId()); } + /** + * Get MasterKeyZNodePath based on KeyId. + * + * @param keyId master key id. + * @return masterKey ZNodePath. + */ + private String getMasterKeyZNodePathByKeyId(int keyId) { + String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId; + return getNodePath(routerRMDTMasterKeysRootPath, nodeName); + } + + /** + * Get RouterMasterKey from ZK. + * + * @param nodePath The path where masterKey is stored in zk. + * + * @return RouterMasterKey. + * @throws IOException An IO Error occurred. + */ + private RouterMasterKey getRouterMasterKeyFromZK(String nodePath) + throws IOException { + try { + byte[] data = get(nodePath); + if ((data == null) || (data.length == 0)) { + return null; + } + + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + DelegationKey key = new DelegationKey(); + key.readFields(din); + + return RouterMasterKey.newInstance(key.getKeyId(), + ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); + } catch (Exception ex) { + LOG.error("No node in path {}.", nodePath); + throw new IOException(ex); + } + } + + /** + * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier. + * + * The stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // add delegationToken + storeOrUpdateRouterRMDT(request, false); + + // Get the stored delegationToken from ZK and return. + RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false); + long end = clock.getTime(); + opDurations.getStoreNewTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(resultStoreToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Update RMDelegationTokenIdentifier. + * + * The update stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // get the Token storage path + String nodePath = getStoreTokenZNodePathByTokenRequest(request); + + // updateStoredToken needs to determine whether the zkNode exists. + // If it exists, update the token data. + // If it does not exist, write the new token data directly. + boolean pathExists = true; + if (!exists(nodePath)) { + pathExists = false; + } + + if (pathExists) { + // update delegationToken + storeOrUpdateRouterRMDT(request, true); + } else { + // add new delegationToken + storeNewToken(request); + } + + // Get the stored delegationToken from ZK and return. + RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false); + long end = clock.getTime(); + opDurations.updateStoredTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(resultStoreToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Remove RMDelegationTokenIdentifier. + * + * The remove stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // get the Token storage path + String nodePath = getStoreTokenZNodePathByTokenRequest(request); + + // If the path to be deleted does not exist, throw an exception directly. + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Check again, first get the data from ZK, + // if the data is not empty, then delete it + RouterStoreToken storeToken = getStoreTokenFromZK(request, false); + if (storeToken != null) { + zkManager.delete(nodePath); + } + + // return deleted token data. + long end = clock.getTime(); + opDurations.removeStoredTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(storeToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * The Router Supports GetTokenByRouterStoreToken. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return RouterRMTokenResponse. + * @throws YarnException if the call to the state store is unsuccessful + * @throws IOException An IO Error occurred + */ @Override public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // Before get the token, + // we need to determine whether the path where the token is stored exists. + // If it doesn't exist, we will throw an exception. + String nodePath = getStoreTokenZNodePathByTokenRequest(request); + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Get the stored delegationToken from ZK and return. + RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false); + // return deleted token data. + long end = clock.getTime(); + opDurations.getTokenByRouterStoreTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(resultStoreToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } + } + + /** + * Convert MasterKey to DelegationKey. + * + * Before using this function, + * please use FederationRouterRMTokenInputValidator to verify the request. + * By default, the request is not empty, and the internal object is not empty. + * + * @param request RouterMasterKeyRequest + * @return DelegationKey. + */ + private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) { + RouterMasterKey masterKey = request.getRouterMasterKey(); + return convertMasterKeyToDelegationKey(masterKey); + } + + /** + * Convert MasterKey to DelegationKey. + * + * @param masterKey masterKey. + * @return DelegationKey. + */ + private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) { + ByteBuffer keyByteBuf = masterKey.getKeyBytes(); + byte[] keyBytes = new byte[keyByteBuf.remaining()]; + keyByteBuf.get(keyBytes); + return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes); + } + + /** + * Check if a path exists in zk. + * + * @param path Path to be checked. + * @return Returns true if the path exists, false if the path does not exist. + * @throws Exception When an exception to access zk occurs. + */ + @VisibleForTesting + boolean exists(final String path) throws Exception { + return zkManager.exists(path); + } + + /** + * Add or update delegationToken. + * + * Before using this function, + * please use FederationRouterRMTokenInputValidator to verify the request. + * By default, the request is not empty, and the internal object is not empty. + * + * @param request storeToken + * @param isUpdate true, update the token; false, create a new token. + * @throws Exception exception occurs. + */ + private void storeOrUpdateRouterRMDT(RouterRMTokenRequest request, boolean isUpdate) + throws Exception { + + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + String nodeCreatePath = getStoreTokenZNodePathByTokenRequest(request); + LOG.debug("nodeCreatePath = {}, isUpdate = {}", nodeCreatePath, isUpdate); + put(nodeCreatePath, routerStoreToken.toByteArray(), isUpdate); + } + + /** + * Get ZNode Path of StoreToken. + * + * Before using this method, we should use FederationRouterRMTokenInputValidator + * to verify the request,ensure that the request is not empty, + * and ensure that the object in the request is not empty. + * + * @param request RouterMasterKeyRequest. + * @return RouterRMToken ZNode Path. + * @throws IOException io exception occurs. + */ + private String getStoreTokenZNodePathByTokenRequest(RouterRMTokenRequest request) + throws IOException { + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + return getStoreTokenZNodePathByIdentifier(identifier); + } + + /** + * Get ZNode Path of StoreToken. + * + * @param identifier YARNDelegationTokenIdentifier + * @return RouterRMToken ZNode Path. + */ + private String getStoreTokenZNodePathByIdentifier(YARNDelegationTokenIdentifier identifier) { + String nodePath = getNodePath(routerRMDelegationTokensRootPath, + ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + return nodePath; + } + + /** + * Get RouterStoreToken from ZK. + * + * @param request RouterMasterKeyRequest. + * @param quiet If true is silent mode, no error message is printed at this time, + * if false is non-silent mode, error message is printed at this time. + * @return RouterStoreToken. + * @throws IOException io exception occurs. + */ + private RouterStoreToken getStoreTokenFromZK(RouterRMTokenRequest request, + boolean quiet) throws IOException { + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + return getStoreTokenFromZK(identifier, quiet); + } + + /** + * Get RouterStoreToken from ZK. + * + * @param identifier YARN DelegationToken Identifier + * @param quiet Whether it is in quiet mode, + * if it is in quiet mode, no exception information will be output. + * @return RouterStoreToken. + * @throws IOException io exception occurs. + */ + private RouterStoreToken getStoreTokenFromZK(YARNDelegationTokenIdentifier identifier, + boolean quiet) throws IOException { + // get the Token storage path + String nodePath = getStoreTokenZNodePathByIdentifier(identifier); + return getStoreTokenFromZK(nodePath, quiet); + } + + /** + * Get RouterStoreToken from ZK. + * + * @param nodePath Znode location where data is stored. + * @param quiet Whether it is in quiet mode, + * if it is in quiet mode, no exception information will be output. + * @return RouterStoreToken. + * @throws IOException io exception occurs. + */ + private RouterStoreToken getStoreTokenFromZK(String nodePath, boolean quiet) + throws IOException { + try { + byte[] data = get(nodePath); + if ((data == null) || (data.length == 0)) { + return null; + } + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class); + storeToken.readFields(din); + return storeToken; + } catch (Exception ex) { + if (!quiet) { + LOG.error("No node in path [" + nodePath + "]"); Review Comment: Use {} > [Federation] Router Support DelegationToken With ZK > --------------------------------------------------- > > Key: YARN-11350 > URL: https://issues.apache.org/jira/browse/YARN-11350 > Project: Hadoop YARN > Issue Type: Sub-task > Components: federation, router > Affects Versions: 3.4.0 > Reporter: Shilun Fan > Assignee: Shilun Fan > Priority: Major > Labels: pull-request-available > > [Federation] Router Support DelegationToken With > ZookeeperFederationStateStore. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org