YARN-5391. PolicyManager to tie together Router/AMRM Federation policies. 
(Carlo Curino via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27765f13
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27765f13
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27765f13

Branch: refs/heads/YARN-2915
Commit: 27765f13898655be69ff8be9d2b1930f78bb00d1
Parents: 27b133b
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Nov 1 19:54:18 2016 -0700
Committer: Carlo Curino <cur...@apache.org>
Committed: Tue May 16 08:52:37 2017 -0700

----------------------------------------------------------------------
 .../policies/AbstractPolicyManager.java         | 175 +++++++++++++++++++
 .../FederationPolicyInitializationContext.java  |   3 +-
 .../policies/UniformBroadcastPolicyManager.java |  56 ++++++
 .../policies/WeightedLocalityPolicyManager.java |  67 +++++++
 .../records/SubClusterPolicyConfiguration.java  |  13 ++
 .../policies/BasePolicyManagerTest.java         | 108 ++++++++++++
 ...ionPolicyInitializationContextValidator.java |   5 +-
 .../TestUniformBroadcastPolicyManager.java      |  40 +++++
 .../TestWeightedLocalityPolicyManager.java      |  79 +++++++++
 .../utils/FederationPoliciesTestUtil.java       |   2 +-
 10 files changed, 545 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
new file mode 100644
index 0000000..e77f2e3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides basic implementation for common methods that multiple
+ * policies will need to implement.
+ */
+public abstract class AbstractPolicyManager implements
+    FederationPolicyManager {
+
+  private String queue;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class routerFederationPolicy;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class amrmProxyFederationPolicy;
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AbstractPolicyManager.class);
+  /**
+   * This default implementation validates the
+   * {@link FederationPolicyInitializationContext},
+   * then checks whether it needs to reinstantiate the class (null or
+   * mismatching type), and reinitialize the policy.
+   *
+   * @param federationPolicyContext the current context
+   * @param oldInstance             the existing (possibly null) instance.
+   *
+   * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
+   * instance
+   *
+   * @throws FederationPolicyInitializationException if the reinitalization is
+   *                                                 not valid, and ensure
+   *                                                 previous state is 
preserved
+   */
+  public FederationAMRMProxyPolicy getAMRMPolicy(
+      FederationPolicyInitializationContext federationPolicyContext,
+      FederationAMRMProxyPolicy oldInstance)
+      throws FederationPolicyInitializationException {
+
+    if (amrmProxyFederationPolicy == null) {
+      throw new FederationPolicyInitializationException("The parameter "
+          + "amrmProxyFederationPolicy should be initialized in "
+          + this.getClass().getSimpleName() + " constructor.");
+    }
+
+    try {
+      return (FederationAMRMProxyPolicy) internalPolicyGetter(
+          federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
+    } catch (ClassCastException e) {
+      throw new FederationPolicyInitializationException(e);
+    }
+
+  }
+
+  /**
+   * This default implementation validates the
+   * {@link FederationPolicyInitializationContext},
+   * then checks whether it needs to reinstantiate the class (null or
+   * mismatching type), and reinitialize the policy.
+   *
+   * @param federationPolicyContext the current context
+   * @param oldInstance             the existing (possibly null) instance.
+   *
+   * @return a valid and fully reinitalized {@link FederationRouterPolicy}
+   * instance
+   *
+   * @throws FederationPolicyInitializationException if the reinitalization is
+   *                                                 not valid, and ensure
+   *                                                 previous state is 
preserved
+   */
+
+  public FederationRouterPolicy getRouterPolicy(
+      FederationPolicyInitializationContext federationPolicyContext,
+      FederationRouterPolicy oldInstance)
+      throws FederationPolicyInitializationException {
+
+    //checks that sub-types properly initialize the types of policies
+    if (routerFederationPolicy == null) {
+      throw new FederationPolicyInitializationException("The policy "
+          + "type should be initialized in " + this.getClass().getSimpleName()
+          + " constructor.");
+    }
+
+    try {
+      return (FederationRouterPolicy) internalPolicyGetter(
+          federationPolicyContext, oldInstance, routerFederationPolicy);
+    } catch (ClassCastException e) {
+      throw new FederationPolicyInitializationException(e);
+    }
+  }
+
+  @Override
+  public String getQueue() {
+    return queue;
+  }
+
+  @Override
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  /**
+   * Common functionality to instantiate a reinitialize a {@link
+   * ConfigurableFederationPolicy}.
+   */
+  private ConfigurableFederationPolicy internalPolicyGetter(
+      final FederationPolicyInitializationContext federationPolicyContext,
+      ConfigurableFederationPolicy oldInstance, Class policy)
+      throws FederationPolicyInitializationException {
+
+    FederationPolicyInitializationContextValidator
+        .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+    if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
+      try {
+        oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
+      } catch (InstantiationException e) {
+        throw new FederationPolicyInitializationException(e);
+      } catch (IllegalAccessException e) {
+        throw new FederationPolicyInitializationException(e);
+      }
+    }
+
+    //copying the context to avoid side-effects
+    FederationPolicyInitializationContext modifiedContext =
+        updateContext(federationPolicyContext,
+            oldInstance.getClass().getCanonicalName());
+
+    oldInstance.reinitialize(modifiedContext);
+    return oldInstance;
+  }
+
+  /**
+   * This method is used to copy-on-write the context, that will be passed
+   * downstream to the router/amrmproxy policies.
+   */
+  private FederationPolicyInitializationContext updateContext(
+      FederationPolicyInitializationContext federationPolicyContext,
+      String type) {
+    // copying configuration and context to avoid modification of original
+    SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
+        .newInstance(federationPolicyContext
+            .getSubClusterPolicyConfiguration());
+    newConf.setType(type);
+
+    return new FederationPolicyInitializationContext(newConf,
+                  federationPolicyContext.getFederationSubclusterResolver(),
+                  federationPolicyContext.getFederationStateStoreFacade(),
+                  federationPolicyContext.getHomeSubcluster());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
index 46dd6eb..4d29a41 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
@@ -41,10 +41,11 @@ public class FederationPolicyInitializationContext {
 
   public FederationPolicyInitializationContext(
       SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
-      FederationStateStoreFacade storeFacade) {
+      FederationStateStoreFacade storeFacade, SubClusterId home) {
     this.federationPolicyConfiguration = policy;
     this.federationSubclusterResolver = resolver;
     this.federationStateStoreFacade = storeFacade;
+    this.homeSubcluster = home;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
new file mode 100644
index 0000000..a01f8fa
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a simple implementation of a {@code
+ * FederationPolicyManager}.
+ *
+ * It combines the basic policies: {@link UniformRandomRouterPolicy} and
+ * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
+ * "spread" the load among sub-clusters uniformly.
+ *
+ * This simple policy might impose heavy load on the RMs and return more
+ * containers than a job requested as all requests are (replicated and)
+ * broadcasted.
+ */
+public class UniformBroadcastPolicyManager
+    extends AbstractPolicyManager {
+
+  public UniformBroadcastPolicyManager() {
+    //this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = UniformRandomRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+  }
+
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    ByteBuffer buf = ByteBuffer.allocate(0);
+    return SubClusterPolicyConfiguration
+        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
new file mode 100644
index 0000000..f3c6673
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link WeightedRandomRouterPolicy} for the router and a {@link
+ * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class WeightedLocalityPolicyManager
+    extends AbstractPolicyManager {
+
+  private WeightedPolicyInfo weightedPolicyInfo;
+
+  public WeightedLocalityPolicyManager() {
+    //this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy =  WeightedRandomRouterPolicy.class;
+    amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
+    weightedPolicyInfo = new WeightedPolicyInfo();
+  }
+
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+    return SubClusterPolicyConfiguration
+        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+  }
+
+  @VisibleForTesting
+  public WeightedPolicyInfo getWeightedPolicyInfo() {
+    return weightedPolicyInfo;
+  }
+
+  @VisibleForTesting
+  public void setWeightedPolicyInfo(
+      WeightedPolicyInfo weightedPolicyInfo) {
+    this.weightedPolicyInfo = weightedPolicyInfo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
index 2839139..52807d9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
@@ -40,6 +40,7 @@ import java.nio.ByteBuffer;
 @Unstable
 public abstract class SubClusterPolicyConfiguration {
 
+
   @Private
   @Unstable
   public static SubClusterPolicyConfiguration newInstance(String queue,
@@ -52,6 +53,18 @@ public abstract class SubClusterPolicyConfiguration {
     return policy;
   }
 
+  @Private
+  @Unstable
+  public static SubClusterPolicyConfiguration newInstance(
+      SubClusterPolicyConfiguration conf) {
+    SubClusterPolicyConfiguration policy =
+        Records.newRecord(SubClusterPolicyConfiguration.class);
+    policy.setQueue(conf.getQueue());
+    policy.setType(conf.getType());
+    policy.setParams(conf.getParams());
+    return policy;
+  }
+
   /**
    * Get the name of the queue for which we are configuring a policy.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
new file mode 100644
index 0000000..c609886
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class provides common test methods for testing {@code
+ * FederationPolicyManager}s.
+ */
+public abstract class BasePolicyManagerTest {
+
+
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected FederationPolicyManager wfp = null;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class expectedPolicyManager;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class expectedAMRMProxyPolicy;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class expectedRouterPolicy;
+
+
+  @Test
+  public void testSerializeAndInstantiate() throws Exception {
+    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+        expectedAMRMProxyPolicy,
+        expectedRouterPolicy);
+  }
+
+  @Test(expected = FederationPolicyInitializationException.class)
+  public void testSerializeAndInstantiateBad1() throws Exception {
+    serializeAndDeserializePolicyManager(wfp, String.class,
+        expectedAMRMProxyPolicy, expectedRouterPolicy);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testSerializeAndInstantiateBad2() throws Exception {
+    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+        String.class, expectedRouterPolicy);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testSerializeAndInstantiateBad3() throws Exception {
+    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+        expectedAMRMProxyPolicy, String.class);
+  }
+
+  protected static void serializeAndDeserializePolicyManager(
+      FederationPolicyManager wfp, Class policyManagerType,
+      Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
+
+    // serializeConf it in a context
+    SubClusterPolicyConfiguration fpc =
+        wfp.serializeConf();
+    fpc.setType(policyManagerType.getCanonicalName());
+    FederationPolicyInitializationContext context = new
+        FederationPolicyInitializationContext();
+    context.setSubClusterPolicyConfiguration(fpc);
+    context
+        
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
+    context.setFederationSubclusterResolver(
+        FederationPoliciesTestUtil.initResolver());
+    context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
+
+    // based on the "context" created instantiate new class and use it
+    Class c = Class.forName(wfp.getClass().getCanonicalName());
+    FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
+
+    FederationAMRMProxyPolicy federationAMRMProxyPolicy =
+        wfp2.getAMRMPolicy(context, null);
+
+    //needed only for tests (getARMRMPolicy change the "type" in conf)
+    fpc.setType(wfp.getClass().getCanonicalName());
+
+    FederationRouterPolicy federationRouterPolicy =
+        wfp2.getRouterPolicy(context, null);
+
+    Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
+        expAMRMProxyPolicy);
+
+    Assert.assertEquals(federationRouterPolicy.getClass(),
+        expRouterPolicy);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index c79fd2a..d906b92 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMR
 import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import 
org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -38,6 +39,7 @@ public class 
TestFederationPolicyInitializationContextValidator {
   private SubClusterPolicyConfiguration goodConfig;
   private SubClusterResolver goodSR;
   private FederationStateStoreFacade goodFacade;
+  private SubClusterId goodHome;
   private FederationPolicyInitializationContext context;
 
   @Before
@@ -45,8 +47,9 @@ public class 
TestFederationPolicyInitializationContextValidator {
     goodFacade = FederationPoliciesTestUtil.initFacade();
     goodConfig = new MockPolicyManager().serializeConf();
     goodSR = FederationPoliciesTestUtil.initResolver();
+    goodHome = SubClusterId.newInstance("homesubcluster");
     context = new FederationPolicyInitializationContext(goodConfig, goodSR,
-        goodFacade);
+        goodFacade, goodHome);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
new file mode 100644
index 0000000..542a5ae
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+import org.junit.Before;
+
+/**
+ * Simple test of {@link UniformBroadcastPolicyManager}.
+ */
+public class TestUniformBroadcastPolicyManager extends BasePolicyManagerTest {
+
+  @Before
+  public void setup() {
+    //config policy
+    wfp = new UniformBroadcastPolicyManager();
+    wfp.setQueue("queue1");
+
+    //set expected params that the base test class will use for tests
+    expectedPolicyManager = UniformBroadcastPolicyManager.class;
+    expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
+    expectedRouterPolicy = UniformRandomRouterPolicy.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
new file mode 100644
index 0000000..ab9cec4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import 
org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple test of {@link WeightedLocalityPolicyManager}.
+ */
+public class TestWeightedLocalityPolicyManager extends
+    BasePolicyManagerTest {
+
+  private WeightedPolicyInfo policyInfo;
+
+  @Before
+  public void setup() {
+    // configure a policy
+
+    wfp = new WeightedLocalityPolicyManager();
+    wfp.setQueue("queue1");
+    SubClusterId sc1 = SubClusterId.newInstance("sc1");
+    SubClusterId sc2 = SubClusterId.newInstance("sc2");
+    policyInfo = new WeightedPolicyInfo();
+
+    Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+    routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
+    routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
+    policyInfo.setRouterPolicyWeights(routerWeights);
+
+    Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+    amrmWeights.put(new SubClusterIdInfo(sc1), 0.2f);
+    amrmWeights.put(new SubClusterIdInfo(sc2), 0.8f);
+    policyInfo.setAMRMPolicyWeights(amrmWeights);
+
+    ((WeightedLocalityPolicyManager) wfp).setWeightedPolicyInfo(
+        policyInfo);
+
+    //set expected params that the base test class will use for tests
+    expectedPolicyManager = WeightedLocalityPolicyManager.class;
+    expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
+    expectedRouterPolicy = WeightedRandomRouterPolicy.class;
+  }
+
+  @Test
+  public void testPolicyInfoSetCorrectly() throws Exception {
+    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+                                         expectedAMRMProxyPolicy,
+                                         expectedRouterPolicy);
+
+    //check the policyInfo propagates through ser/der correctly
+    Assert.assertEquals(((WeightedLocalityPolicyManager) wfp)
+                            .getWeightedPolicyInfo(), policyInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27765f13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index 87ed8d1..85fdc96 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -143,7 +143,7 @@ public final class FederationPoliciesTestUtil {
       SubClusterInfo> activeSubclusters) throws YarnException {
     FederationPolicyInitializationContext context =
         new FederationPolicyInitializationContext(null, initResolver(),
-            initFacade());
+            initFacade(), SubClusterId.newInstance("homesubcluster"));
     initializePolicyContext(context, policy, policyInfo, activeSubclusters);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to