This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a610e616f72 YARN-11614. [Federation] Add Federation PolicyManager 
Validation Rules. (#6271) Contributed by Shilun Fan.
8a610e616f72 is described below

commit 8a610e616f724dad14ff46033ba52787525c542b
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Sun Nov 26 16:01:35 2023 +0800

    YARN-11614. [Federation] Add Federation PolicyManager Validation Rules. 
(#6271) Contributed by Shilun Fan.
    
    Reviewed-by: Inigo Goiri <inigo...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../yarn/server/router/RouterServerUtil.java       |  8 +++++++
 .../rmadmin/FederationRMAdminInterceptor.java      | 26 +++++++++++++++++++---
 .../rmadmin/TestFederationRMAdminInterceptor.java  | 20 +++++++++++++----
 3 files changed, 47 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 23902bdc6190..744ddc87050d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -822,4 +822,12 @@ public final class RouterServerUtil {
     return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
         YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
   }
+
+  public static boolean checkPolicyManagerValid(String policyManager,
+      List<String> supportWeightList) throws YarnException {
+    if (supportWeightList.contains(policyManager)) {
+      return true;
+    }
+    return false;
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
index 7ade0d400797..230308e361ff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
@@ -75,6 +75,9 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
 import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -92,6 +95,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
@@ -107,6 +111,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hadoop.yarn.server.router.RouterServerUtil.checkPolicyManagerValid;
+
 public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestInterceptor {
 
   private static final Logger LOG =
@@ -115,6 +121,10 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
   private static final String COMMA = ",";
   private static final String COLON = ":";
 
+  private static final List<String> SUPPORT_WEIGHT_MANAGERS =
+      new 
ArrayList<>(Arrays.asList(WeightedLocalityPolicyManager.class.getName(),
+      PriorityBroadcastPolicyManager.class.getName(), 
WeightedHomePolicyManager.class.getName()));
+
   private Map<SubClusterId, ResourceManagerAdministrationProtocol> 
adminRMProxies;
   private FederationStateStoreFacade federationFacade;
   private final Clock clock = new MonotonicClock();
@@ -924,6 +934,13 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
       RouterServerUtil.logAndThrowException("Missing Queue information.", 
null);
     }
 
+    String policyManagerClassName = request.getPolicyManagerClassName();
+    if (!checkPolicyManagerValid(policyManagerClassName, 
SUPPORT_WEIGHT_MANAGERS)) {
+      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+      RouterServerUtil.logAndThrowException(policyManagerClassName +
+          " does not support the use of queue weights.", null);
+    }
+
     String amRmWeight = federationQueueWeight.getAmrmWeight();
     FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
 
@@ -935,9 +952,6 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
 
     try {
       long startTime = clock.getTime();
-      // Step1, get parameters.
-      String policyManagerClassName = request.getPolicyManagerClassName();
-
 
       // Step2, parse amRMPolicyWeights.
       Map<SubClusterIdInfo, Float> amRMPolicyWeights = 
getSubClusterWeightMap(amRmWeight);
@@ -1346,6 +1360,12 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
       RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName 
information.", null);
     }
 
+    if (!checkPolicyManagerValid(policyManagerClassName, 
SUPPORT_WEIGHT_MANAGERS)) {
+      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+      RouterServerUtil.logAndThrowException(policyManagerClassName +
+              "does not support the use of queue weights.", null);
+    }
+
     String amRmWeight = federationQueueWeight.getAmrmWeight();
     FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
index 62f1eee845b5..a1339e419040 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
@@ -643,15 +643,27 @@ public class TestFederationRMAdminInterceptor extends 
BaseRouterRMAdminTest {
     LambdaTestUtils.intercept(YarnException.class, "Missing Queue 
information.",
         () -> interceptor.saveFederationQueuePolicy(request));
 
-    // routerWeight / amrmWeight
-    // The sum of the routerWeight is not equal to 1.
+    // PolicyManager needs to support weight
     FederationQueueWeight federationQueueWeight2 = 
FederationQueueWeight.newInstance(
         "SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
     SaveFederationQueuePolicyRequest request2 =
-        SaveFederationQueuePolicyRequest.newInstance("root.a", 
federationQueueWeight2, "-");
+        SaveFederationQueuePolicyRequest.newInstance("root.a", 
federationQueueWeight2,
+        "TestPolicyManager");
     LambdaTestUtils.intercept(YarnException.class,
-        "The sum of ratios for all subClusters must be equal to 1.",
+        "TestPolicyManager does not support the use of queue weights.",
         () -> interceptor.saveFederationQueuePolicy(request2));
+
+    // routerWeight / amrmWeight
+    // The sum of the routerWeight is not equal to 1.
+    String policyTypeName = 
WeightedLocalityPolicyManager.class.getCanonicalName();
+    FederationQueueWeight federationQueueWeight3 = 
FederationQueueWeight.newInstance(
+        "SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
+    SaveFederationQueuePolicyRequest request3 =
+        SaveFederationQueuePolicyRequest.newInstance("root.a", 
federationQueueWeight3,
+        policyTypeName);
+    LambdaTestUtils.intercept(YarnException.class,
+        "The sum of ratios for all subClusters must be equal to 1.",
+        () -> interceptor.saveFederationQueuePolicy(request3));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to