This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 8a610e616f72 YARN-11614. [Federation] Add Federation PolicyManager Validation Rules. (#6271) Contributed by Shilun Fan. 8a610e616f72 is described below commit 8a610e616f724dad14ff46033ba52787525c542b Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Sun Nov 26 16:01:35 2023 +0800 YARN-11614. [Federation] Add Federation PolicyManager Validation Rules. (#6271) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri <inigo...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../yarn/server/router/RouterServerUtil.java | 8 +++++++ .../rmadmin/FederationRMAdminInterceptor.java | 26 +++++++++++++++++++--- .../rmadmin/TestFederationRMAdminInterceptor.java | 20 +++++++++++++---- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 23902bdc6190..744ddc87050d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -822,4 +822,12 @@ public final class RouterServerUtil { return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE, YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE); } + + public static boolean checkPolicyManagerValid(String policyManager, + List<String> supportWeightList) throws YarnException { + if (supportWeightList.contains(policyManager)) { + return true; + } + return false; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 7ade0d400797..230308e361ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePol import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -92,6 +95,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Arrays; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; @@ -107,6 +111,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.hadoop.yarn.server.router.RouterServerUtil.checkPolicyManagerValid; + public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor { private static final Logger LOG = @@ -115,6 +121,10 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep private static final String COMMA = ","; private static final String COLON = ":"; + private static final List<String> SUPPORT_WEIGHT_MANAGERS = + new ArrayList<>(Arrays.asList(WeightedLocalityPolicyManager.class.getName(), + PriorityBroadcastPolicyManager.class.getName(), WeightedHomePolicyManager.class.getName())); + private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies; private FederationStateStoreFacade federationFacade; private final Clock clock = new MonotonicClock(); @@ -924,6 +934,13 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep RouterServerUtil.logAndThrowException("Missing Queue information.", null); } + String policyManagerClassName = request.getPolicyManagerClassName(); + if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) { + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + RouterServerUtil.logAndThrowException(policyManagerClassName + + " does not support the use of queue weights.", null); + } + String amRmWeight = federationQueueWeight.getAmrmWeight(); FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight); @@ -935,9 +952,6 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep try { long startTime = clock.getTime(); - // Step1, get parameters. - String policyManagerClassName = request.getPolicyManagerClassName(); - // Step2, parse amRMPolicyWeights. Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight); @@ -1346,6 +1360,12 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null); } + if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) { + routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved(); + RouterServerUtil.logAndThrowException(policyManagerClassName + + "does not support the use of queue weights.", null); + } + String amRmWeight = federationQueueWeight.getAmrmWeight(); FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 62f1eee845b5..a1339e419040 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -643,15 +643,27 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.", () -> interceptor.saveFederationQueuePolicy(request)); - // routerWeight / amrmWeight - // The sum of the routerWeight is not equal to 1. + // PolicyManager needs to support weight FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance( "SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0"); SaveFederationQueuePolicyRequest request2 = - SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, "-"); + SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, + "TestPolicyManager"); LambdaTestUtils.intercept(YarnException.class, - "The sum of ratios for all subClusters must be equal to 1.", + "TestPolicyManager does not support the use of queue weights.", () -> interceptor.saveFederationQueuePolicy(request2)); + + // routerWeight / amrmWeight + // The sum of the routerWeight is not equal to 1. + String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName(); + FederationQueueWeight federationQueueWeight3 = FederationQueueWeight.newInstance( + "SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0"); + SaveFederationQueuePolicyRequest request3 = + SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight3, + policyTypeName); + LambdaTestUtils.intercept(YarnException.class, + "The sum of ratios for all subClusters must be equal to 1.", + () -> interceptor.saveFederationQueuePolicy(request3)); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org