Repository: hadoop Updated Branches: refs/heads/YARN-7402 b8e718082 -> 91dd58b76
YARN-7708. [GPG] Load based policy generator. Contributed by Young Chen. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91dd58b7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91dd58b7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91dd58b7 Branch: refs/heads/YARN-7402 Commit: 91dd58b76a14c694145290e511999b847837e945 Parents: b8e7180 Author: Botong Huang <bot...@apache.org> Authored: Wed Aug 15 09:45:50 2018 -0700 Committer: Botong Huang <bot...@apache.org> Committed: Wed Aug 15 09:46:13 2018 -0700 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 4 + .../policygenerator/GlobalPolicy.java | 2 +- .../policygenerator/LoadBasedGlobalPolicy.java | 255 +++++++++++++++++++ .../TestLoadBasedGlobalPolicy.java | 211 +++++++++++++++ 4 files changed, 471 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/91dd58b7/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 9fcafad..6e062c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -209,6 +209,10 @@ <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator" /> <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> </Match> + <Match> + <Class name="org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.LoadBasedGlobalPolicy$SortByDescendingLoad" /> + <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> + </Match> <!-- Ignore some irrelevant class name warning --> <Match> <Class name="org.apache.hadoop.yarn.api.records.SerializedException" /> http://git-wip-us.apache.org/repos/asf/hadoop/blob/91dd58b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java index 38d762d..fcd22c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java @@ -51,7 +51,7 @@ public abstract class GlobalPolicy implements Configurable { * duplicate calls to the same * endpoints as the GlobalPolicy is invoked * once per queue. */ - protected Map<Class, String> registerPaths() { + protected Map<Class<?>, String> registerPaths() { // Default register nothing return Collections.emptyMap(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/91dd58b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java new file mode 100644 index 0000000..03bd48c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Load based policy that generates weighted policies by scaling + * the cluster load (based on pending) to a weight from 0.0 to 1.0. + */ +public class LoadBasedGlobalPolicy extends GlobalPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(LoadBasedGlobalPolicy.class); + + private static final String FEDERATION_GPG_LOAD_BASED_PREFIX = + YarnConfiguration.FEDERATION_GPG_PREFIX + "policy.generator.load-based."; + + public static final String FEDERATION_GPG_LOAD_BASED_MIN_PENDING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.minimum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING = 100; + + public static final String FEDERATION_GPG_LOAD_BASED_MAX_PENDING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.maximum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING = 1000; + + public static final String FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = + FEDERATION_GPG_LOAD_BASED_PREFIX + "weight.minimum"; + public static final float DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = 0.0f; + + public static final String FEDERATION_GPG_LOAD_BASED_MAX_EDIT = + FEDERATION_GPG_LOAD_BASED_PREFIX + "edit.maximum"; + public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT = 3; + + public static final String FEDERATION_GPG_LOAD_BASED_SCALING = + FEDERATION_GPG_LOAD_BASED_PREFIX + "scaling"; + public static final String DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING = + Scaling.LINEAR.name(); + + public enum Scaling { + LINEAR, + QUADRATIC, + LOG, + NONE + } + + // Minimum pending count before the policy starts scaling down the weights + private int minPending; + // Maximum pending count before policy stops scaling down the weights + //(they'll be set to min weight) + private int maxPending; + // Minimum weight that a sub cluster will be assigned + private float minWeight; + // Maximum number of weights that can be scaled down simultaneously + private int maxEdit; + // Scaling type + private Scaling scaling = Scaling.NONE; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + minPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING); + maxPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING); + minWeight = conf.getFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT); + maxEdit = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, + DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT); + try { + scaling = Scaling.valueOf(conf.get(FEDERATION_GPG_LOAD_BASED_SCALING, + DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING)); + } catch (IllegalArgumentException e) { + LOG.warn("Invalid scaling mode provided", e); + } + // Check that all configuration values are valid + if (!(minPending <= maxPending)) { + throw new YarnRuntimeException("minPending=" + minPending + + " must be less than or equal to maxPending=" + maxPending); + } + if (!(minWeight >= 0 && minWeight < 1)) { + throw new YarnRuntimeException( + "minWeight=" + minWeight + " must be within range [0,1)"); + } + } + + @Override + protected Map<Class<?>, String> registerPaths() { + // Register for the endpoints we want to receive information on + Map<Class<?>, String> map = new HashMap<>(); + map.put(ClusterMetricsInfo.class, RMWSConsts.METRICS); + return map; + } + + @Override + protected FederationPolicyManager updatePolicy(String queueName, + Map<SubClusterId, Map<Class, Object>> clusterInfo, + FederationPolicyManager currentManager) { + Map<SubClusterId, ClusterMetricsInfo> clusterMetrics = new HashMap<>(); + for (Map.Entry<SubClusterId, Map<Class, Object>> e : clusterInfo + .entrySet()) { + clusterMetrics.put(e.getKey(), + (ClusterMetricsInfo) e.getValue().get(ClusterMetricsInfo.class)); + } + if (currentManager == null) { + LOG.info("Creating load based weighted policy queue {}", queueName); + Map<SubClusterIdInfo, Float> weights = getTargetWeights(clusterMetrics); + WeightedLocalityPolicyManager manager = + new WeightedLocalityPolicyManager(); + manager.setQueue(queueName); + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights); + currentManager = manager; + } else if (currentManager instanceof WeightedLocalityPolicyManager) { + Map<SubClusterIdInfo, Float> weights = getTargetWeights(clusterMetrics); + LOG.info("Updating policy for queue {} based on cluster load to: {}", + queueName, weights); + WeightedLocalityPolicyManager manager = + (WeightedLocalityPolicyManager) currentManager; + manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights); + manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights); + } else { + LOG.warn("Policy for queue {} is of type {}, expected {}", queueName, + currentManager.getClass(), WeightedLocalityPolicyManager.class); + } + return currentManager; + } + + @VisibleForTesting + protected Map<SubClusterIdInfo, Float> getTargetWeights( + Map<SubClusterId, ClusterMetricsInfo> clusterMetrics) { + Map<SubClusterIdInfo, Float> weights = + GPGUtils.createUniformWeights(clusterMetrics.keySet()); + List<SubClusterId> scs = new ArrayList<>(clusterMetrics.keySet()); + // Sort the sub clusters into descending order based on pending load + scs.sort(new SortByDescendingLoad(clusterMetrics)); + // Keep the top N loaded sub clusters + scs = scs.subList(0, Math.min(maxEdit, scs.size())); + for (SubClusterId sc : scs) { + LOG.info("Updating weight for sub cluster {}", sc.toString()); + int pending = clusterMetrics.get(sc).getAppsPending(); + if (pending <= minPending) { + LOG.info("Load ({}) is lower than minimum ({}), skipping", pending, + minPending); + } else if (pending < maxPending) { + float weight = 1.0f; + // The different scaling strategies should all map values from the + // range min_pending+1 to max_pending to the range min_weight to 1.0f + // so we pre process and simplify the domain to some value [1, MAX-MIN) + int val = pending - minPending; + int maxVal = maxPending - minPending; + switch (scaling) { + case NONE: + break; + case LINEAR: + weight = (float) (maxVal - val) / (float) (maxVal); + break; + case QUADRATIC: + double maxValQuad = Math.pow(maxVal, 2); + double valQuad = Math.pow(val, 2); + weight = (float) (maxValQuad - valQuad) / (float) (maxValQuad); + break; + case LOG: + double maxValLog = Math.log(maxVal); + double valLog = Math.log(val); + weight = (float) (maxValLog - valLog) / (float) (maxValLog); + break; + } + // Scale the weights to respect the config minimum + weight = weight * (1.0f - minWeight); + weight += minWeight; + weights.put(new SubClusterIdInfo(sc), weight); + LOG.info("Load ({}) is within maximum ({}), setting weights via {} " + + "scale to {}", pending, maxPending, scaling, weight); + } else { + weights.put(new SubClusterIdInfo(sc), minWeight); + LOG.info( + "Load ({}) exceeded maximum ({}), setting weight to minimum: {}", + pending, maxPending, minWeight); + } + } + validateWeights(weights); + return weights; + } + + /** + * Helper to avoid all zero weights. If weights are all zero, they're reset + * to one + * @param weights weights to validate + */ + private void validateWeights(Map<SubClusterIdInfo, Float> weights) { + for(Float w : weights.values()) { + // If we find a nonzero weight, we're validated + if(w > 0.0f) { + return; + } + } + LOG.warn("All " + weights.size() + + " generated weights were 0.0f. Resetting to 1.0f"); + // All weights were zero. Reset all back to 1.0 + for(SubClusterIdInfo id : weights.keySet()) { + weights.put(id, 1.0f); + } + } + + private static final class SortByDescendingLoad + implements Comparator<SubClusterId> { + + private Map<SubClusterId, ClusterMetricsInfo> clusterMetrics; + + private SortByDescendingLoad( + Map<SubClusterId, ClusterMetricsInfo> clusterMetrics) { + this.clusterMetrics = clusterMetrics; + } + + public int compare(SubClusterId a, SubClusterId b) { + // Sort by pending load + return clusterMetrics.get(b).getAppsPending() - clusterMetrics.get(a) + .getAppsPending(); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/91dd58b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java new file mode 100644 index 0000000..34c3e57 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the Load Based Global Policy. + */ +public class TestLoadBasedGlobalPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(TestLoadBasedGlobalPolicy.class); + + private static final int NUM_SC = 3; + private static final float DELTA = 0.00001f; + + private static final int MIN_PENDING = 100; + private static final int MAX_PENDING = 500; + + private List<SubClusterId> subClusterIds; + private Map<SubClusterId, ClusterMetricsInfo> clusterMetricsInfos; + private Map<SubClusterIdInfo, Float> weights; + + private Configuration conf; + private LoadBasedGlobalPolicy policyGenerator; + + public TestLoadBasedGlobalPolicy() { + conf = new Configuration(); + policyGenerator = new LoadBasedGlobalPolicy(); + } + + @Before + public void setUp() { + + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 2); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_PENDING, + MIN_PENDING); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_PENDING, + MAX_PENDING); + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.0f); + conf.set(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_SCALING, + LoadBasedGlobalPolicy.Scaling.LINEAR.name()); + policyGenerator.setConf(conf); + + subClusterIds = new ArrayList<>(); + clusterMetricsInfos = new HashMap<>(); + // Set up sub clusters + for (int i = 0; i < NUM_SC; ++i) { + // Sub cluster Id + SubClusterId id = SubClusterId.newInstance("sc" + i); + subClusterIds.add(id); + + // Cluster metrics info + ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo(); + metricsInfo.setAppsPending(50); + clusterMetricsInfos.put(id, metricsInfo); + } + } + + @Test + public void testSimpleTargetWeights() { + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testLoadTargetWeights() { + getMetric(0).setAppsPending(100); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + getMetric(0).setAppsPending(500); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testMaxEdit() { + // The policy should be able to edit 2 weights + getMetric(0).setAppsPending(MAX_PENDING + 200); + getMetric(1).setAppsPending(MAX_PENDING + 100); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(0.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + // After updating the config, it should only edit the most loaded + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 1); + policyGenerator.setConf(conf); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testMinWeight() { + // If a minimum weight is set, the generator should not go below it + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.5f); + policyGenerator.setConf(conf); + getMetric(0).setAppsPending(Integer.MAX_VALUE); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(0.5, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + @Test + public void testScaling() { + LOG.info("Testing that the generator weights are monotonically" + + " decreasing regardless of scaling method"); + for (LoadBasedGlobalPolicy.Scaling scaling : + new LoadBasedGlobalPolicy.Scaling[] { + LoadBasedGlobalPolicy.Scaling.LINEAR, + LoadBasedGlobalPolicy.Scaling.QUADRATIC, + LoadBasedGlobalPolicy.Scaling.LOG }) { + LOG.info("Testing {} scaling...", scaling); + conf.set(LoadBasedGlobalPolicy.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING, + scaling.name()); + policyGenerator.setConf(conf); + // Test a continuous range for scaling + float prevWeight = 1.01f; + for (int load = 0; load < MAX_PENDING * 2; ++load) { + getMetric(0).setAppsPending(load); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + if (load < MIN_PENDING) { + // Below the minimum load, it should stay 1.0f + assertEquals(1.0f, getWeight(0), DELTA); + } else if (load < MAX_PENDING) { + // In the specified range, the weight should consistently decrease + float weight = getWeight(0); + assertTrue(weight < prevWeight); + prevWeight = weight; + } else { + // Above the maximum load, it should stay 0.0f + assertEquals(0.0f, getWeight(0), DELTA); + } + } + } + } + + @Test + public void testNonZero() { + // If all generated weights are zero, they should be set back to one + conf.setFloat(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, + 0.0f); + conf.setInt(LoadBasedGlobalPolicy.FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 3); + policyGenerator.setConf(conf); + getMetric(0).setAppsPending(Integer.MAX_VALUE); + getMetric(1).setAppsPending(Integer.MAX_VALUE); + getMetric(2).setAppsPending(Integer.MAX_VALUE); + weights = policyGenerator.getTargetWeights(clusterMetricsInfos); + assertEquals(weights.size(), 3); + assertEquals(1.0, getWeight(0), DELTA); + assertEquals(1.0, getWeight(1), DELTA); + assertEquals(1.0, getWeight(2), DELTA); + } + + private float getWeight(int sc) { + return weights.get(new SubClusterIdInfo(subClusterIds.get(sc))); + } + + private ClusterMetricsInfo getMetric(int sc) { + return clusterMetricsInfos.get(subClusterIds.get(sc)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org