goiri commented on code in PR #4656: URL: https://github.com/apache/hadoop/pull/4656#discussion_r933387046
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java: ########## @@ -262,4 +219,92 @@ public synchronized void reset() { } + /** + * This method provides a wrapper of all policy functionalities for routing a + * reservation. Internally it manages configuration changes, and policy + * init/reinit. + * + * @param request the reservation to route. + * + * @return the id of the subcluster that will be the "home" for this + * reservation. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this reservation. + */ + public SubClusterId getReservationHomeSubCluster( + ReservationSubmissionRequest request) throws YarnException { + + // the maps are concurrent, but we need to protect from reset() + // reinitialization mid-execution by creating a new reference local to this + // method. + Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap; + Map<String, FederationRouterPolicy> policyMap = globalPolicyMap; + + if (request == null) { + throw new FederationPolicyException( + "The ReservationSubmissionRequest cannot be null."); + } + + String queue = request.getQueue(); + FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue); + + if (policy == null) { + // this should never happen, as the to maps are updated together + throw new FederationPolicyException("No FederationRouterPolicy found " + + "for queue: " + request.getQueue() + " (while routing " + + "reservation: " + request.getReservationId() + ") " + + "and no default specified."); + } + + return policy.getReservationHomeSubcluster(request); + } + + private FederationRouterPolicy getFederationRouterPolicy( + Map<String, SubClusterPolicyConfiguration> cachedConfiguration, + Map<String, FederationRouterPolicy> policyMap, String queue) + throws FederationPolicyInitializationException { + + // the facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + String copyQueue = queue; + + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("There is no policy configured for the queue: {}, " + + "falling back to defaults.", copyQueue, e); + } + + // If there is no policy configured for this queue, fallback to the baseline + // policy that is configured either in the store or via XML config (and + // cached) + if (configuration == null) { + final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + LOG.warn("There is no policies configured for queue: {} " + Review Comment: We can rearrange so the string is a single line. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java: ########## @@ -17,61 +17,40 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; /** * This implements a policy that interprets "weights" as a ordered list of - * preferences among sub-clusters. Highest weight among active subclusters is + * preferences among sub-clusters. Highest weight among active subClusters is Review Comment: We keep changing the capitalization. I think "Subcluster" is fine; no need to capitalize the C. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java: ########## @@ -262,4 +219,92 @@ public synchronized void reset() { } + /** + * This method provides a wrapper of all policy functionalities for routing a + * reservation. Internally it manages configuration changes, and policy + * init/reinit. + * + * @param request the reservation to route. + * + * @return the id of the subcluster that will be the "home" for this + * reservation. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this reservation. + */ + public SubClusterId getReservationHomeSubCluster( + ReservationSubmissionRequest request) throws YarnException { + + // the maps are concurrent, but we need to protect from reset() + // reinitialization mid-execution by creating a new reference local to this + // method. + Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap; + Map<String, FederationRouterPolicy> policyMap = globalPolicyMap; + + if (request == null) { + throw new FederationPolicyException( + "The ReservationSubmissionRequest cannot be null."); + } + + String queue = request.getQueue(); + FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue); + + if (policy == null) { + // this should never happen, as the to maps are updated together + throw new FederationPolicyException("No FederationRouterPolicy found " + + "for queue: " + request.getQueue() + " (while routing " + + "reservation: " + request.getReservationId() + ") " + + "and no default specified."); + } + + return policy.getReservationHomeSubcluster(request); + } + + private FederationRouterPolicy getFederationRouterPolicy( + Map<String, SubClusterPolicyConfiguration> cachedConfiguration, + Map<String, FederationRouterPolicy> policyMap, String queue) + throws FederationPolicyInitializationException { + + // the facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + String copyQueue = queue; + + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("There is no policy configured for the queue: {}, " + Review Comment: We can fit the string into a single line. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java: ########## @@ -128,53 +126,55 @@ public SubClusterId getHomeSubcluster( ResourceRequest nodeRequest = null; ResourceRequest rackRequest = null; ResourceRequest anyRequest = null; + for (ResourceRequest rr : rrList) { // Handle "node" requests try { targetId = resolver.getSubClusterForNode(rr.getResourceName()); nodeRequest = rr; } catch (YarnException e) { - LOG.error("Cannot resolve node : {}", e.getLocalizedMessage()); + LOG.error("Cannot resolve node."); } // Handle "rack" requests try { resolver.getSubClustersForRack(rr.getResourceName()); rackRequest = rr; } catch (YarnException e) { - LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage()); + LOG.error("Cannot resolve rack."); } // Handle "ANY" requests if (ResourceRequest.isAnyLocation(rr.getResourceName())) { anyRequest = rr; continue; } } + if (nodeRequest == null) { - throw new YarnException("Missing node request"); + throw new YarnException("Missing node request."); } if (rackRequest == null) { - throw new YarnException("Missing rack request"); + throw new YarnException("Missing rack request."); } if (anyRequest == null) { - throw new YarnException("Missing any request"); + throw new YarnException("Missing any request."); } - LOG.info( - "Node request: " + nodeRequest.getResourceName() + ", Rack request: " - + rackRequest.getResourceName() + ", Any request: " + anyRequest - .getResourceName()); + + LOG.info("Node request: {} , Rack request: {} , Any request: {}.", + nodeRequest.getResourceName(), rackRequest.getResourceName(), + anyRequest.getResourceName()); + // Handle "node" requests if (validSubClusters.contains(targetId) && enabledSCs .contains(targetId)) { - LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(), - targetId); + LOG.info("Node {} is in SubCluster: {}.", nodeRequest.getResourceName(), targetId); return targetId; } else { throw new YarnException("The node " + nodeRequest.getResourceName() + " is in a blacklist SubCluster or not active. "); } } catch (YarnException e) { LOG.error("Validating resource requests failed, Falling back to " - + "WeightedRandomRouterPolicy placement: " + e.getMessage()); + + "WeightedRandomRouterPolicy placement.", e); Review Comment: Single line sring? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java: ########## @@ -128,53 +126,55 @@ public SubClusterId getHomeSubcluster( ResourceRequest nodeRequest = null; ResourceRequest rackRequest = null; ResourceRequest anyRequest = null; + for (ResourceRequest rr : rrList) { // Handle "node" requests try { targetId = resolver.getSubClusterForNode(rr.getResourceName()); nodeRequest = rr; } catch (YarnException e) { - LOG.error("Cannot resolve node : {}", e.getLocalizedMessage()); + LOG.error("Cannot resolve node."); Review Comment: Doesn't it make sense to keep output the message at least? ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java: ########## @@ -262,4 +219,92 @@ public synchronized void reset() { } + /** + * This method provides a wrapper of all policy functionalities for routing a + * reservation. Internally it manages configuration changes, and policy + * init/reinit. + * + * @param request the reservation to route. + * + * @return the id of the subcluster that will be the "home" for this + * reservation. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this reservation. + */ + public SubClusterId getReservationHomeSubCluster( + ReservationSubmissionRequest request) throws YarnException { + + // the maps are concurrent, but we need to protect from reset() + // reinitialization mid-execution by creating a new reference local to this + // method. + Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap; + Map<String, FederationRouterPolicy> policyMap = globalPolicyMap; + + if (request == null) { + throw new FederationPolicyException( + "The ReservationSubmissionRequest cannot be null."); + } + + String queue = request.getQueue(); + FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue); + + if (policy == null) { + // this should never happen, as the to maps are updated together + throw new FederationPolicyException("No FederationRouterPolicy found " + + "for queue: " + request.getQueue() + " (while routing " + + "reservation: " + request.getReservationId() + ") " + + "and no default specified."); + } + + return policy.getReservationHomeSubcluster(request); + } + + private FederationRouterPolicy getFederationRouterPolicy( + Map<String, SubClusterPolicyConfiguration> cachedConfiguration, + Map<String, FederationRouterPolicy> policyMap, String queue) + throws FederationPolicyInitializationException { + + // the facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + String copyQueue = queue; + + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("There is no policy configured for the queue: {}, " + + "falling back to defaults.", copyQueue, e); + } + + // If there is no policy configured for this queue, fallback to the baseline + // policy that is configured either in the store or via XML config (and + // cached) + if (configuration == null) { + final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + LOG.warn("There is no policies configured for queue: {} " + + "we fallback to default policy for: {}. ", copyQueue, policyKey); + copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("Cannot retrieve policy configured for the queue: {}, " + + "falling back to defaults.", copyQueue, e); + } + } + + // the fallback is not configure via store, but via XML, using + // previously loaded configuration. + if (configuration == null) { + configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + } + + // if the configuration has changed since last loaded, reinit the policy + // based on current configuration + if (!cachedConfiguration.containsKey(copyQueue) Review Comment: Fix indentation ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java: ########## @@ -262,4 +219,92 @@ public synchronized void reset() { } + /** + * This method provides a wrapper of all policy functionalities for routing a + * reservation. Internally it manages configuration changes, and policy + * init/reinit. + * + * @param request the reservation to route. + * + * @return the id of the subcluster that will be the "home" for this + * reservation. + * + * @throws YarnException if there are issues initializing policies, or no + * valid sub-cluster id could be found for this reservation. + */ + public SubClusterId getReservationHomeSubCluster( + ReservationSubmissionRequest request) throws YarnException { + + // the maps are concurrent, but we need to protect from reset() + // reinitialization mid-execution by creating a new reference local to this + // method. + Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap; + Map<String, FederationRouterPolicy> policyMap = globalPolicyMap; + + if (request == null) { + throw new FederationPolicyException( + "The ReservationSubmissionRequest cannot be null."); + } + + String queue = request.getQueue(); + FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue); + + if (policy == null) { + // this should never happen, as the to maps are updated together + throw new FederationPolicyException("No FederationRouterPolicy found " + + "for queue: " + request.getQueue() + " (while routing " + + "reservation: " + request.getReservationId() + ") " + + "and no default specified."); + } + + return policy.getReservationHomeSubcluster(request); + } + + private FederationRouterPolicy getFederationRouterPolicy( + Map<String, SubClusterPolicyConfiguration> cachedConfiguration, + Map<String, FederationRouterPolicy> policyMap, String queue) + throws FederationPolicyInitializationException { + + // the facade might cache this request, based on its parameterization + SubClusterPolicyConfiguration configuration = null; + String copyQueue = queue; + + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("There is no policy configured for the queue: {}, " + + "falling back to defaults.", copyQueue, e); + } + + // If there is no policy configured for this queue, fallback to the baseline + // policy that is configured either in the store or via XML config (and + // cached) + if (configuration == null) { + final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + LOG.warn("There is no policies configured for queue: {} " + + "we fallback to default policy for: {}. ", copyQueue, policyKey); + copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + try { + configuration = federationFacade.getPolicyConfiguration(copyQueue); + } catch (YarnException e) { + LOG.warn("Cannot retrieve policy configured for the queue: {}, " + + "falling back to defaults.", copyQueue, e); + } + } + + // the fallback is not configure via store, but via XML, using + // previously loaded configuration. + if (configuration == null) { + configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + } + + // if the configuration has changed since last loaded, reinit the policy + // based on current configuration + if (!cachedConfiguration.containsKey(copyQueue) + || !cachedConfiguration.get(copyQueue).equals(configuration)) { Review Comment: I would extract to make it more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org