[ 
https://issues.apache.org/jira/browse/YARN-11350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637855#comment-17637855
 ] 

ASF GitHub Bot commented on YARN-11350:
---------------------------------------

slfan1989 commented on code in PR #5131:
URL: https://github.com/apache/hadoop/pull/5131#discussion_r1030632183


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java:
##########
@@ -886,45 +975,554 @@ public UpdateReservationHomeSubClusterResponse 
updateReservationHomeSubCluster(
     return UpdateReservationHomeSubClusterResponse.newInstance();
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Store NewMasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
-  public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest 
request)
+  public synchronized RouterMasterKeyResponse 
storeNewMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used 
directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Parse the delegationKey from the request and get the ZK storage path.
+    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+    String nodeCreatePath = 
getMasterKeyZNodePathByDelegationKey(delegationKey);
+    LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", 
delegationKey.getKeyId(),
+        nodeCreatePath);
+
+    // Write master key data to zk.
+    try(ByteArrayOutputStream os = new ByteArrayOutputStream();
+        DataOutputStream fsOut = new DataOutputStream(os)) {
+      delegationKey.write(fsOut);
+      put(nodeCreatePath, os.toByteArray(), false);
+    }
+
+    // Get the stored masterKey from zk.
+    RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath);
+    long end = clock.getTime();
+    opDurations.addStoreNewMasterKeyDuration(start, end);
+    return RouterMasterKeyResponse.newInstance(masterKeyFromZK);
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
-  public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest 
request)
+  public synchronized RouterMasterKeyResponse 
removeStoredMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used 
directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+      // Parse the delegationKey from the request and get the ZK storage path.
+      RouterMasterKey masterKey = request.getRouterMasterKey();
+      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+      String nodeRemovePath = 
getMasterKeyZNodePathByDelegationKey(delegationKey);
+      LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", 
delegationKey.getKeyId(),
+          nodeRemovePath);
+
+      // Check if the path exists, Throws an exception if the path does not 
exist.
+      if (!exists(nodeRemovePath)) {
+        throw new YarnException("ZkNodePath = " + nodeRemovePath + " not 
exists!");
+      }
+
+      // try to remove masterKey.
+      zkManager.delete(nodeRemovePath);
+      long end = clock.getTime();
+      opDurations.removeStoredMasterKeyDuration(start, end);
+      return RouterMasterKeyResponse.newInstance(masterKey);
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used 
directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // Parse the delegationKey from the request and get the ZK storage path.
+      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+      String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
+
+      // Check if the path exists, Throws an exception if the path does not 
exist.
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Get the stored masterKey from zk.
+      RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath);
+      long end = clock.getTime();
+      opDurations.getMasterKeyByDelegationKeyDuration(start, end);
+      return RouterMasterKeyResponse.newInstance(routerMasterKey);
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
+  }
+
+  /**
+   * Get MasterKeyZNodePath based on DelegationKey.
+   *
+   * @param delegationKey delegationKey.
+   * @return masterKey ZNodePath.
+   */
+  private String getMasterKeyZNodePathByDelegationKey(DelegationKey 
delegationKey) {
+    return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId());
+  }
+
+  /**
+   * Get MasterKeyZNodePath based on KeyId.
+   *
+   * @param keyId master key id.
+   * @return masterKey ZNodePath.
+   */
+  private String getMasterKeyZNodePathByKeyId(int keyId) {
+    String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId;
+    return getNodePath(routerRMDTMasterKeysRootPath, nodeName);
+  }
+
+  /**
+   * Get RouterMasterKey from ZK.
+   *
+   * @param nodePath The path where masterKey is stored in zk.
+   *
+   * @return RouterMasterKey.
+   * @throws IOException An IO Error occurred.
+   */
+  private RouterMasterKey getRouterMasterKeyFromZK(String nodePath)
+      throws IOException {
+    try {
+      byte[] data = get(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      DelegationKey key = new DelegationKey();
+      key.readFields(din);
+
+      return RouterMasterKey.newInstance(key.getKeyId(),
+          ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
+    } catch (Exception ex) {
+      LOG.error("No node in path {}.", nodePath);
+      throw new IOException(ex);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier.
+   *
+   * The stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
-  public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
+  public synchronized RouterRMTokenResponse storeNewToken(RouterRMTokenRequest 
request)

Review Comment:
   Thank you very much for your help in reviewing the code. In 
ZookeeperFederationStateStore, we have 7 methods that use the synchronized 
keyword
   
   1. Group1 methods are mainly used to create or remove MasterKey
   - storeNewMasterKey
   - removeStoredMasterKey
   
   2. Group2 methods are mainly used to create or update and remove Token
   - storeNewToken
   - updateStoredToken
   - removeStoredToken
   
   3. Group3 methods are used to increment DelegationTokenSeqNum and MasterKeyId
   - incrementDelegationTokenSeqNum
   - incrementCurrentKeyId
   
   This part is mainly to protect the ZNode of ZK. We hope that the same Router 
has only one thread to write, update and delete the same ZNode at the same time.
   





> [Federation] Router Support DelegationToken With ZK
> ---------------------------------------------------
>
>                 Key: YARN-11350
>                 URL: https://issues.apache.org/jira/browse/YARN-11350
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation, router
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>
> [Federation] Router Support DelegationToken With 
> ZookeeperFederationStateStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to