goiri commented on code in PR #4656:
URL: https://github.com/apache/hadoop/pull/4656#discussion_r932565594


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java:
##########
@@ -63,4 +71,109 @@ public void validate(ApplicationSubmissionContext 
appSubmissionContext)
     }
   }
 
+  /**
+   * This method is implemented by the specific policy, and it is used to route
+   * both reservations, and applications among a given set of
+   * sub-clusters.
+   *
+   * @param queue the queue for this application/reservation
+   * @param preSelectSubClusters a pre-filter set of sub-clusters
+   * @return the chosen sub-cluster
+   *
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  protected abstract SubClusterId chooseSubCluster(String queue,
+      Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws 
YarnException;
+
+  /**
+   * Filter chosen SubCluster based on reservationId.
+   *
+   * @param reservationId the globally unique identifier for a reservation.
+   * @param activeSubClusters the map of ids to info for all active 
subclusters.
+   * @return the chosen sub-cluster
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
+      ReservationId reservationId, Map<SubClusterId, SubClusterInfo> 
activeSubClusters)
+      throws YarnException {
+
+    // if a reservation exists limit scope to the sub-cluster this
+    // reservation is mapped to
+    // TODO: Implemented in YARN-11236
+    return activeSubClusters;
+  }
+
+  /**
+   * Simply picks from alphabetically-sorted active subclusters based on the
+   * hash of quey name. Jobs of the same queue will all be routed to the same
+   * sub-cluster, as far as the number of active sub-cluster and their names
+   * remain the same.
+   *
+   * @param appContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
+   *
+   * @param blackLists the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
+   *
+   * @return a hash-based chosen {@link SubClusterId} that will be the "home"
+   *         for this application.
+   *
+   * @throws YarnException if there are no active subclusters.
+   */
+  @Override
+  public SubClusterId getHomeSubcluster(ApplicationSubmissionContext 
appContext,
+      List<SubClusterId> blackLists) throws YarnException {
+
+    // null checks and default-queue behavior
+    validate(appContext);
+
+    // apply filtering based on reservation location and active sub-clusters
+    Map<SubClusterId, SubClusterInfo> filteredSubClusters = 
prefilterSubClusters(
+            appContext.getReservationID(), getActiveSubclusters());
+
+    FederationPolicyUtils.validateSubClusterAvailability(
+            new ArrayList<>(filteredSubClusters.keySet()), blackLists);
+
+    // remove black SubCluster
+    if (blackLists != null) {
+      blackLists.forEach(filteredSubClusters::remove);
+    }
+
+    // pick the chosen subCluster from the active ones
+    return chooseSubCluster(appContext.getQueue(), filteredSubClusters);
+  }
+
+  /**
+   * This method provides a wrapper of all policy functionalities for routing a
+   * reservation. Internally it manages configuration changes, and policy
+   * init/reinit.
+   *
+   * @param request the reservation to route.
+   *
+   * @return the id of the subcluster that will be the "home" for this
+   *         reservation.
+   *
+   * @throws YarnException if there are issues initializing policies, or no
+   *           valid sub-cluster id could be found for this reservation.
+   */
+  @Override
+  public SubClusterId 
getReservationHomeSubcluster(ReservationSubmissionRequest request)
+      throws YarnException {
+    if (request == null) {
+      throw new FederationPolicyException(

Review Comment:
   Single line?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -35,36 +35,21 @@
  * sub-clusters.
  */
 public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
-
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blacklist) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters) 
throws YarnException {
 
-    FederationPolicyUtils.validateSubClusterAvailability(
-        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
-    // note: we cannot pre-compute the weights, as the set of activeSubcluster
+    // note: we cannot pre-compute the weights, as the set of activeSubCluster
     // changes dynamically (and this would unfairly spread the load to
     // sub-clusters adjacent to an inactive one), hence we need to count/scan
     // the list and based on weight pick the next sub-cluster.
     Map<SubClusterIdInfo, Float> weights =

Review Comment:
   single line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java:
##########
@@ -59,46 +60,13 @@ public void 
reinitialize(FederationPolicyInitializationContext policyContext)
     setPolicyContext(policyContext);
   }
 
-  /**
-   * Simply picks a random active subCluster to start the AM (this does NOT
-   * depend on the weights in the policy).
-   *
-   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
-   *          has to be routed to an appropriate subCluster for execution.
-   *
-   * @param blackListSubClusters the list of subClusters as identified by
-   *          {@link SubClusterId} to blackList from the selection of the home
-   *          subCluster.
-   *
-   * @return a randomly chosen subcluster.
-   *
-   * @throws YarnException if there are no active subclusters.
-   */
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blackListSubClusters) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
-
-    FederationPolicyUtils.validateSubClusterAvailability(list,
-        blackListSubClusters);
-
-    if (blackListSubClusters != null) {
-
-      // Remove from the active SubClusters from StateStore the blacklisted 
ones
-      for (SubClusterId scId : blackListSubClusters) {
-        list.remove(scId);
-      }
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters) 
throws YarnException {
+    if(preSelectSubClusters == null || preSelectSubClusters.size() == 0) {

Review Comment:
   space after if



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java:
##########
@@ -128,53 +126,55 @@ public SubClusterId getHomeSubcluster(
       ResourceRequest nodeRequest = null;
       ResourceRequest rackRequest = null;
       ResourceRequest anyRequest = null;
+
       for (ResourceRequest rr : rrList) {
         // Handle "node" requests
         try {
           targetId = resolver.getSubClusterForNode(rr.getResourceName());
           nodeRequest = rr;
         } catch (YarnException e) {
-          LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
+          LOG.error("Cannot resolve node.", e);

Review Comment:
   I prefer not having the full exception. Thoughts?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java:
##########
@@ -28,20 +28,21 @@
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import 
org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import 
org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.*;

Review Comment:
   Avoid



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -35,36 +35,21 @@
  * sub-clusters.
  */
 public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
-
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blacklist) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters) 
throws YarnException {
 
-    FederationPolicyUtils.validateSubClusterAvailability(
-        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
-    // note: we cannot pre-compute the weights, as the set of activeSubcluster
+    // note: we cannot pre-compute the weights, as the set of activeSubCluster
     // changes dynamically (and this would unfairly spread the load to
     // sub-clusters adjacent to an inactive one), hence we need to count/scan
     // the list and based on weight pick the next sub-cluster.
     Map<SubClusterIdInfo, Float> weights =
-        getPolicyInfo().getRouterPolicyWeights();
+            getPolicyInfo().getRouterPolicyWeights();
 
     ArrayList<Float> weightList = new ArrayList<>();
     ArrayList<SubClusterId> scIdList = new ArrayList<>();
     for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
-      if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
-        continue;
-      }
-      if (entry.getKey() != null
-          && activeSubclusters.containsKey(entry.getKey().toId())) {
+      if (entry.getKey() != null && 
preSelectSubClusters.containsKey(entry.getKey().toId())) {

Review Comment:
   Extract.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -73,9 +58,8 @@ public SubClusterId getHomeSubcluster(
     int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList);
     if (pickedIndex == -1) {
       throw new FederationPolicyException(
-          "No positive weight found on active subclusters");
+          "No positive weight found on active subClusters.");

Review Comment:
   i would leave the old capitalization



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java:
##########
@@ -59,46 +60,13 @@ public void 
reinitialize(FederationPolicyInitializationContext policyContext)
     setPolicyContext(policyContext);
   }
 
-  /**
-   * Simply picks a random active subCluster to start the AM (this does NOT
-   * depend on the weights in the policy).
-   *
-   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
-   *          has to be routed to an appropriate subCluster for execution.
-   *
-   * @param blackListSubClusters the list of subClusters as identified by
-   *          {@link SubClusterId} to blackList from the selection of the home
-   *          subCluster.
-   *
-   * @return a randomly chosen subcluster.
-   *
-   * @throws YarnException if there are no active subclusters.
-   */
   @Override
-  public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext,
-      List<SubClusterId> blackListSubClusters) throws YarnException {
-
-    // null checks and default-queue behavior
-    validate(appSubmissionContext);
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters =
-        getActiveSubclusters();
-
-    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
-
-    FederationPolicyUtils.validateSubClusterAvailability(list,
-        blackListSubClusters);
-
-    if (blackListSubClusters != null) {
-
-      // Remove from the active SubClusters from StateStore the blacklisted 
ones
-      for (SubClusterId scId : blackListSubClusters) {
-        list.remove(scId);
-      }
+  protected SubClusterId chooseSubCluster(
+      String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters) 
throws YarnException {
+    if(preSelectSubClusters == null || preSelectSubClusters.size() == 0) {

Review Comment:
   isEmpty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to