http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c1074c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 6f54959..e891a27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -18,12 +18,12 @@ package org.apache.slider.server.appmaster.state; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -31,42 +31,35 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.slider.api.ClusterDescription; -import org.apache.slider.api.ClusterDescriptionKeys; -import org.apache.slider.api.ClusterDescriptionOperations; import org.apache.slider.api.ClusterNode; import org.apache.slider.api.InternalKeys; -import org.apache.slider.api.ResourceKeys; import org.apache.slider.api.StatusKeys; +import org.apache.slider.api.proto.Messages; +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.ApplicationState; +import org.apache.slider.api.resource.Component; import org.apache.slider.api.types.ApplicationLivenessInformation; import org.apache.slider.api.types.ComponentInformation; import org.apache.slider.api.types.RoleStatistics; import org.apache.slider.common.SliderExitCodes; import org.apache.slider.common.SliderKeys; -import org.apache.slider.common.tools.ConfigHelper; import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.core.conf.AggregateConf; -import org.apache.slider.core.conf.ConfTree; -import org.apache.slider.core.conf.ConfTreeOperations; -import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.exceptions.BadClusterStateException; import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.core.exceptions.ErrorStrings; import org.apache.slider.core.exceptions.NoSuchNodeException; import org.apache.slider.core.exceptions.SliderInternalStateException; import org.apache.slider.core.exceptions.TriggerClusterTeardownException; -import org.apache.slider.core.persist.AggregateConfSerDeser; -import org.apache.slider.core.persist.ConfTreeSerDeser; import org.apache.slider.providers.PlacementPolicy; import org.apache.slider.providers.ProviderRole; -import org.apache.slider.server.appmaster.management.LongGauge; import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; import org.apache.slider.server.appmaster.management.MetricsConstants; +import org.apache.slider.server.appmaster.metrics.SliderMetrics; import org.apache.slider.server.appmaster.operations.AbstractRMOperation; import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation; import org.apache.slider.server.appmaster.operations.ContainerRequestOperation; @@ -77,7 +70,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,12 +81,10 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hadoop.metrics2.lib.Interns.info; import static org.apache.slider.api.ResourceKeys.*; -import static org.apache.slider.api.RoleKeys.*; import static org.apache.slider.api.StateValues.*; -import static org.apache.slider.providers.docker.DockerKeys.DEFAULT_DOCKER_USE_PRIVILEGED; -import static org.apache.slider.providers.docker.DockerKeys.DOCKER_IMAGE; -import static org.apache.slider.providers.docker.DockerKeys.DOCKER_USE_PRIVILEGED; +import static org.apache.slider.api.resource.ApplicationState.STARTED; /** * The model of all the ongoing state of a Slider AM. @@ -117,53 +107,8 @@ public class AppState { */ private boolean applicationLive = false; - /** - * The definition of the instance. Flexing updates the resources section - * This is used as a synchronization point on activities that update - * the CD, and also to update some of the structures that - * feed in to the CD - */ - private AggregateConf instanceDefinition; - - /** - * Time the instance definition snapshots were created - */ - private long snapshotTime; - - /** - * Snapshot of the instance definition. This is fully - * resolved. - */ - private AggregateConf instanceDefinitionSnapshot; + private Application app; - /** - * Snapshot of the raw instance definition; unresolved and - * without any patch of an AM into it. - */ - private AggregateConf unresolvedInstanceDefinition; - - /** - * snapshot of resources as of last update time - */ - private ConfTreeOperations resourcesSnapshot; - private ConfTreeOperations appConfSnapshot; - private ConfTreeOperations internalsSnapshot; - - /** - * This is the status, the live model - */ - private ClusterDescription clusterStatus = new ClusterDescription(); - - /** - * Metadata provided by the AM for use in filling in status requests - */ - private Map<String, String> applicationInfo; - - /** - * Client properties created via the provider -static for the life - * of the application - */ - private Map<String, String> clientProperties = new HashMap<>(); /** * This is a template of the cluster status @@ -180,11 +125,6 @@ public class AppState { new ConcurrentSkipListMap<>(); /** - * The master node. - */ - private RoleInstance appMasterNode; - - /** * Hash map of the containers we have. This includes things that have * been allocated but are not live; it is a superset of the live list */ @@ -198,37 +138,6 @@ public class AppState { */ private final ConcurrentMap<ContainerId, Container> containersBeingReleased = new ConcurrentHashMap<>(); - - /** - * Counter for completed containers ( complete denotes successful or failed ) - */ - private final LongGauge completedContainerCount = new LongGauge(); - - /** - * Count of failed containers - */ - private final LongGauge failedContainerCount = new LongGauge(); - - /** - * # of started containers - */ - private final LongGauge startedContainers = new LongGauge(); - - /** - * # of containers that failed to start - */ - private final LongGauge startFailedContainerCount = new LongGauge(); - - /** - * Track the number of surplus containers received and discarded - */ - private final LongGauge surplusContainers = new LongGauge(); - - /** - * Track the number of requested containers. - * Important: this does not include AA requests which are yet to be issued. - */ - private final LongGauge outstandingContainerRequests = new LongGauge(); /** * Map of requested nodes. This records the command used to start it, @@ -256,7 +165,7 @@ public class AppState { * Nodes that came assigned to a role above that * which were asked for -this appears to happen */ - private final Set<ContainerId> surplusNodes = new HashSet<>(); + private final Set<ContainerId> surplusContainers = new HashSet<>(); /** * Map of containerID to cluster nodes, for status reports. @@ -269,7 +178,6 @@ public class AppState { private final AtomicInteger completionOfUnknownContainerEvent = new AtomicInteger(); - /** * limits of container core numbers in this queue */ @@ -298,6 +206,7 @@ public class AppState { private Resource minResource; private Resource maxResource; + private SliderMetrics appMetrics; /** * Create an instance * @param recordFactory factory for YARN records @@ -309,60 +218,6 @@ public class AppState { Preconditions.checkArgument(metricsAndMonitoring != null, "null metricsAndMonitoring"); this.recordFactory = recordFactory; this.metricsAndMonitoring = metricsAndMonitoring; - - // register any metrics - register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests); - register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers); - register(MetricsConstants.CONTAINERS_STARTED, startedContainers); - register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount); - register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount); - register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount); - } - - private void register(String name, Metric counter) { - this.metricsAndMonitoring.getMetrics().register( - MetricRegistry.name(AppState.class, name), counter); - } - - public long getFailedCountainerCount() { - return failedContainerCount.getCount(); - } - - /** - * Increment the count - */ - public void incFailedCountainerCount() { - failedContainerCount.inc(); - } - - public long getStartFailedCountainerCount() { - return startFailedContainerCount.getCount(); - } - - /** - * Increment the count and return the new value - */ - public void incStartedCountainerCount() { - startedContainers.inc(); - } - - public long getStartedCountainerCount() { - return startedContainers.getCount(); - } - - /** - * Increment the count and return the new value - */ - public void incStartFailedCountainerCount() { - startFailedContainerCount.inc(); - } - - public AtomicInteger getCompletionOfNodeNotInLiveListEvent() { - return completionOfNodeNotInLiveListEvent; - } - - public AtomicInteger getCompletionOfUnknownContainerEvent() { - return completionOfUnknownContainerEvent; } @@ -370,13 +225,7 @@ public class AppState { return roleStatusMap; } - protected Map<String, ProviderRole> getRoleMap() { - return roles; - } - public Map<Integer, ProviderRole> getRolePriorityMap() { - return rolePriorityMap; - } private Map<ContainerId, RoleInstance> getStartingContainers() { return startingContainers; @@ -396,47 +245,13 @@ public class AppState { /** * Get the current view of the cluster status. - * <p> - * Calls to {@link #refreshClusterStatus()} trigger a - * refresh of this field. - * <p> * This is read-only * to the extent that changes here do not trigger updates in the * application state. * @return the cluster status */ - public synchronized ClusterDescription getClusterStatus() { - return clusterStatus; - } - - @VisibleForTesting - protected synchronized void setClusterStatus(ClusterDescription clusterDesc) { - this.clusterStatus = clusterDesc; - } - - /** - * Set the instance definition -this also builds the (now obsolete) - * cluster specification from it. - * - * Important: this is for early binding and must not be used after the build - * operation is complete. - * @param definition initial definition - * @throws BadConfigException - */ - public synchronized void setInitialInstanceDefinition(AggregateConf definition) - throws BadConfigException, IOException { - log.debug("Setting initial instance definition"); - // snapshot the definition - AggregateConfSerDeser serDeser = new AggregateConfSerDeser(); - - unresolvedInstanceDefinition = serDeser.fromInstance(definition); - - this.instanceDefinition = serDeser.fromInstance(definition); - onInstanceDefinitionUpdated(); - } - - public synchronized AggregateConf getInstanceDefinition() { - return instanceDefinition; + public synchronized Application getClusterStatus() { + return app; } /** @@ -475,58 +290,27 @@ public class AppState { maxResource = recordFactory.newResource(containerMaxMemory, containerMaxCores); } - public ConfTreeOperations getResourcesSnapshot() { - return resourcesSnapshot; - } - - public ConfTreeOperations getAppConfSnapshot() { - return appConfSnapshot; - } - - public ConfTreeOperations getInternalsSnapshot() { - return internalsSnapshot; - } - public boolean isApplicationLive() { return applicationLive; } - public long getSnapshotTime() { - return snapshotTime; - } - - public synchronized AggregateConf getInstanceDefinitionSnapshot() { - return instanceDefinitionSnapshot; - } - - public AggregateConf getUnresolvedInstanceDefinition() { - return unresolvedInstanceDefinition; - } public synchronized void buildInstance(AppStateBindingInfo binding) throws BadClusterStateException, BadConfigException, IOException { binding.validate(); log.debug("Building application state"); - publishedProviderConf = binding.publishedProviderConf; - applicationInfo = binding.applicationInfo != null ? binding.applicationInfo - : new HashMap<String, String>(); - - clientProperties = new HashMap<>(); containerReleaseSelector = binding.releaseSelector; - - Set<String> confKeys = ConfigHelper.sortedConfigKeys(publishedProviderConf); - - // Add the -site configuration properties - for (String key : confKeys) { - String val = publishedProviderConf.get(key); - clientProperties.put(key, val); - } - // set the cluster specification (once its dependency the client properties // is out the way - setInitialInstanceDefinition(binding.instanceDefinition); + this.app = binding.application; + appMetrics = SliderMetrics.register(app.getName(), + "Metrics for service"); + appMetrics + .tag("type", "Metrics type [component or service]", "service"); + appMetrics + .tag("appId", "Application id for service", app.getId()); //build the initial role list List<ProviderRole> roleList = new ArrayList<>(binding.roles); @@ -534,51 +318,40 @@ public class AppState { buildRole(providerRole); } - ConfTreeOperations resources = instanceDefinition.getResourceOperations(); - - Set<String> roleNames = resources.getComponentNames(); - for (String name : roleNames) { + int priority = 1; + for (Component component : app.getComponents()) { + String name = component.getName(); if (roles.containsKey(name)) { continue; } - if (hasUniqueNames(resources, name)) { - log.info("Skipping group {}", name); + if (component.getUniqueComponentSupport()) { + log.info("Skipping group " + name + ", as it's unique component"); continue; } - // this is a new value - log.info("Adding role {}", name); - MapOperations resComponent = resources.getComponent(name); - ProviderRole dynamicRole = createDynamicProviderRole(name, resComponent); + log.info("Adding component: " + name); + ProviderRole dynamicRole = + createComponent(name, name, component, priority++); buildRole(dynamicRole); roleList.add(dynamicRole); } //then pick up the requirements buildRoleRequirementsFromResources(); - //set the livespan - MapOperations globalResOpts = instanceDefinition.getResourceOperations().getGlobalOptions(); - - startTimeThreshold = globalResOpts.getOptionInt( - InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE, - InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE); - - failureThreshold = globalResOpts.getOptionInt( - CONTAINER_FAILURE_THRESHOLD, + org.apache.slider.api.resource.Configuration conf = app.getConfiguration(); + startTimeThreshold = + conf.getPropertyLong(InternalKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE, + InternalKeys.DEFAULT_INTERNAL_CONTAINER_FAILURE_SHORTLIFE); + failureThreshold = (int) conf.getPropertyLong(CONTAINER_FAILURE_THRESHOLD, DEFAULT_CONTAINER_FAILURE_THRESHOLD); - nodeFailureThreshold = globalResOpts.getOptionInt( - NODE_FAILURE_THRESHOLD, + nodeFailureThreshold = (int) conf.getPropertyLong(NODE_FAILURE_THRESHOLD, DEFAULT_NODE_FAILURE_THRESHOLD); - initClusterStatus(); - // set up the role history roleHistory = new RoleHistory(roleStatusMap.values(), recordFactory); - roleHistory.register(metricsAndMonitoring); roleHistory.onStart(binding.fs, binding.historyPath); // trigger first node update roleHistory.onNodesUpdated(binding.nodeReports); - //rebuild any live containers rebuildModelFromRestart(binding.liveContainers); @@ -586,180 +359,57 @@ public class AppState { logServerURL = binding.serviceConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, ""); //mark as live applicationLive = true; - } - - public void initClusterStatus() { - //copy into cluster status. - ClusterDescription status = ClusterDescription.copy(clusterStatusTemplate); - status.state = STATE_CREATED; - MapOperations infoOps = new MapOperations("info", status.info); - infoOps.mergeWithoutOverwrite(applicationInfo); - SliderUtils.addBuildInfo(infoOps, "status"); - - long now = now(); - status.setInfoTime(StatusKeys.INFO_LIVE_TIME_HUMAN, - StatusKeys.INFO_LIVE_TIME_MILLIS, - now); - SliderUtils.setInfoTime(infoOps, - StatusKeys.INFO_LIVE_TIME_HUMAN, - StatusKeys.INFO_LIVE_TIME_MILLIS, - now); - if (0 == status.createTime) { - status.createTime = now; - SliderUtils.setInfoTime(infoOps, - StatusKeys.INFO_CREATE_TIME_HUMAN, - StatusKeys.INFO_CREATE_TIME_MILLIS, - now); - } - status.state = STATE_LIVE; - - //set the app state to this status - setClusterStatus(status); - } - - /** - * Build a dynamic provider role - * @param name name of role - * @return a new provider role - * @throws BadConfigException bad configuration - */ - public ProviderRole createDynamicProviderRole(String name, MapOperations component) - throws BadConfigException { - return createDynamicProviderRole(name, name, component); - } - - /** - * Build a dynamic provider role - * @param name name of role - * @param group group of role - * @return a new provider role - * @throws BadConfigException bad configuration - */ - public ProviderRole createDynamicProviderRole(String name, String group, MapOperations component) - throws BadConfigException { - String priOpt = component.getMandatoryOption(COMPONENT_PRIORITY); - int priority = SliderUtils.parseAndValidate( - "value of " + name + " " + COMPONENT_PRIORITY, priOpt, 0, 1, -1); - - String placementOpt = component.getOption(COMPONENT_PLACEMENT_POLICY, - Integer.toString(PlacementPolicy.DEFAULT)); - - int placement = SliderUtils.parseAndValidate( - "value of " + name + " " + COMPONENT_PLACEMENT_POLICY, placementOpt, 0, 0, -1); - - int placementTimeout = component.getOptionInt(PLACEMENT_ESCALATE_DELAY, - DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS); - - ProviderRole newRole = new ProviderRole(name, - group, - priority, - placement, - getNodeFailureThresholdForRole(group), - placementTimeout, - component.getOption(YARN_LABEL_EXPRESSION, DEF_YARN_LABEL_EXPRESSION)); - log.info("New {} ", newRole); + app.setState(STARTED); + } + + //TODO WHY do we need to create the component for AM ? + public ProviderRole createComponent(String name, String group, + Component component, int priority) throws BadConfigException { + + org.apache.slider.api.resource.Configuration conf = + component.getConfiguration(); + long placementTimeout = conf.getPropertyLong(PLACEMENT_ESCALATE_DELAY, + DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS); + long placementPolicy = conf.getPropertyLong(COMPONENT_PLACEMENT_POLICY, + PlacementPolicy.DEFAULT); + int threshold = (int) conf + .getPropertyLong(NODE_FAILURE_THRESHOLD, nodeFailureThreshold); + ProviderRole newRole = + new ProviderRole(name, group, priority, (int)placementPolicy, threshold, + placementTimeout, "", component); + + log.info("Created a new role " + newRole); return newRole; } - /** - * Actions to perform when an instance definition is updated - * Currently: - * <ol> - * <li> - * resolve the configuration - * </li> - * <li> - * update the cluster spec derivative - * </li> - * </ol> - * - * @throws BadConfigException - */ - private synchronized void onInstanceDefinitionUpdated() - throws BadConfigException, IOException { - - log.debug("Instance definition updated"); - //note the time - snapshotTime = now(); - - for (String component : instanceDefinition.getResourceOperations().getComponentNames()) { - instanceDefinition.getAppConfOperations().getOrAddComponent(component); - } - - // resolve references if not already done - instanceDefinition.resolve(); - - // force in the AM desired state values - ConfTreeOperations resources = instanceDefinition.getResourceOperations(); - - if (resources.getComponent(SliderKeys.COMPONENT_AM) != null) { - resources.setComponentOpt( - SliderKeys.COMPONENT_AM, COMPONENT_INSTANCES, "1"); - } - - - //snapshot all three sectons - resourcesSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getResources()); - appConfSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getAppConf()); - internalsSnapshot = ConfTreeOperations.fromInstance(instanceDefinition.getInternal()); - //build a new aggregate from the snapshots - instanceDefinitionSnapshot = new AggregateConf(resourcesSnapshot.confTree, - appConfSnapshot.confTree, - internalsSnapshot.confTree); - instanceDefinitionSnapshot.setName(instanceDefinition.getName()); - - clusterStatusTemplate = ClusterDescriptionOperations.buildFromInstanceDefinition( - instanceDefinition); - - // Add the -site configuration properties - for (Map.Entry<String, String> prop : clientProperties.entrySet()) { - clusterStatusTemplate.clientProperties.put(prop.getKey(), prop.getValue()); + public synchronized void updateComponents( + Messages.FlexComponentRequestProto requestProto) + throws BadConfigException { + for (Component component : app.getComponents()) { + if (component.getName().equals(requestProto.getName())) { + component + .setNumberOfContainers((long) requestProto.getNumberOfContainers()); + } } - - } - - /** - * The resource configuration is updated -review and update state. - * @param resources updated resources specification - * @return a list of any dynamically added provider roles - * (purely for testing purposes) - */ - @VisibleForTesting - public synchronized List<ProviderRole> updateResourceDefinitions(ConfTree resources) - throws BadConfigException, IOException { - log.debug("Updating resources to {}", resources); - // snapshot the (possibly unresolved) values - ConfTreeSerDeser serDeser = new ConfTreeSerDeser(); - unresolvedInstanceDefinition.setResources( - serDeser.fromInstance(resources)); - // assign another copy under the instance definition for resolving - // and then driving application size - instanceDefinition.setResources(serDeser.fromInstance(resources)); - onInstanceDefinitionUpdated(); - - // propagate the role table - Map<String, Map<String, String>> updated = resources.components; - getClusterStatus().roles = SliderUtils.deepClone(updated); - getClusterStatus().updateTime = now(); - return buildRoleRequirementsFromResources(); + //TODO update cluster description + buildRoleRequirementsFromResources(); } /** * build the role requirements from the cluster specification * @return a list of any dynamically added provider roles */ - private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException { + private List<ProviderRole> buildRoleRequirementsFromResources() + throws BadConfigException { List<ProviderRole> newRoles = new ArrayList<>(0); // now update every role's desired count. // if there are no instance values, that role count goes to zero - - ConfTreeOperations resources = - instanceDefinition.getResourceOperations(); - // Add all the existing roles + // component name -> number of containers Map<String, Integer> groupCounts = new HashMap<>(); + for (RoleStatus roleStatus : getRoleStatusMap().values()) { if (roleStatus.isExcludeFromFlexing()) { // skip inflexible roles, e.g AM itself @@ -768,10 +418,11 @@ public class AppState { long currentDesired = roleStatus.getDesired(); String role = roleStatus.getName(); String roleGroup = roleStatus.getGroup(); - int desiredInstanceCount = getDesiredInstanceCount(resources, roleGroup); + Component component = roleStatus.getProviderRole().component; + int desiredInstanceCount = component.getNumberOfContainers().intValue(); int newDesired = desiredInstanceCount; - if (hasUniqueNames(resources, roleGroup)) { + if (component.getUniqueComponentSupport()) { Integer groupCount = 0; if (groupCounts.containsKey(roleGroup)) { groupCount = groupCounts.get(roleGroup); @@ -793,56 +444,54 @@ public class AppState { if (currentDesired != newDesired) { log.info("Role {} flexed from {} to {}", role, currentDesired, newDesired); - roleStatus.setDesired(newDesired); + setDesiredContainers(roleStatus, newDesired); } } // now the dynamic ones. Iterate through the the cluster spec and // add any role status entries not in the role status - Set<String> roleNames = resources.getComponentNames(); - for (String name : roleNames) { + + List<RoleStatus> list = new ArrayList<>(getRoleStatusMap().values()); + for (RoleStatus roleStatus : list) { + String name = roleStatus.getName(); + Component component = roleStatus.getProviderRole().component; if (roles.containsKey(name)) { continue; } - if (hasUniqueNames(resources, name)) { + if (component.getUniqueComponentSupport()) { // THIS NAME IS A GROUP - int desiredInstanceCount = getDesiredInstanceCount(resources, name); + int desiredInstanceCount = component.getNumberOfContainers().intValue(); Integer groupCount = 0; if (groupCounts.containsKey(name)) { groupCount = groupCounts.get(name); } for (int i = groupCount + 1; i <= desiredInstanceCount; i++) { - int priority = resources.getComponentOptInt(name, COMPONENT_PRIORITY, i); + int priority = roleStatus.getPriority(); // this is a new instance of an existing group String newName = String.format("%s%d", name, i); int newPriority = getNewPriority(priority + i - 1); log.info("Adding new role {}", newName); - MapOperations component = resources.getComponent(name, - Collections.singletonMap(COMPONENT_PRIORITY, - Integer.toString(newPriority))); - if (component == null) { - throw new BadConfigException("Component is null for name = " + name - + ", newPriority =" + newPriority); - } - ProviderRole dynamicRole = createDynamicProviderRole(newName, name, component); - RoleStatus roleStatus = buildRole(dynamicRole); - roleStatus.setDesired(1); - log.info("New role {}", roleStatus); + ProviderRole dynamicRole = + createComponent(newName, name, component, newPriority); + RoleStatus newRole = buildRole(dynamicRole); + incDesiredContainers(newRole); + log.info("New role {}", newRole); if (roleHistory != null) { - roleHistory.addNewRole(roleStatus); + roleHistory.addNewRole(newRole); } newRoles.add(dynamicRole); } } else { // this is a new value log.info("Adding new role {}", name); - MapOperations component = resources.getComponent(name); - ProviderRole dynamicRole = createDynamicProviderRole(name, component); - RoleStatus roleStatus = buildRole(dynamicRole); - roleStatus.setDesired(getDesiredInstanceCount(resources, name)); - log.info("New role {}", roleStatus); + ProviderRole dynamicRole = + createComponent(name, name, component, roleStatus.getPriority()); + RoleStatus newRole = buildRole(dynamicRole); + incDesiredContainers(roleStatus, + component.getNumberOfContainers().intValue()); + log.info("New role {}", newRole); if (roleHistory != null) { - roleHistory.addNewRole(roleStatus); + roleHistory.addNewRole(newRole); } newRoles.add(dynamicRole); } @@ -861,37 +510,6 @@ public class AppState { } /** - * Get the desired instance count of a role, rejecting negative values - * @param resources resource map - * @param roleGroup role group - * @return the instance count - * @throws BadConfigException if the count is negative - */ - private int getDesiredInstanceCount(ConfTreeOperations resources, - String roleGroup) throws BadConfigException { - int desiredInstanceCount = - resources.getComponentOptInt(roleGroup, COMPONENT_INSTANCES, 0); - - if (desiredInstanceCount < 0) { - log.error("Role {} has negative desired instances : {}", roleGroup, - desiredInstanceCount); - throw new BadConfigException( - "Negative instance count (%) requested for component %s", - desiredInstanceCount, roleGroup); - } - return desiredInstanceCount; - } - - private Boolean hasUniqueNames(ConfTreeOperations resources, String group) { - MapOperations component = resources.getComponent(group); - if (component == null) { - log.info("Component was null for {} when checking unique names", group); - return Boolean.FALSE; - } - return component.getOptionBool(UNIQUE_NAMES, Boolean.FALSE); - } - - /** * Add knowledge of a role. * This is a build-time operation that is not synchronized, and * should be used while setting up the system state -before servicing @@ -923,66 +541,9 @@ public class AppState { */ private void buildRoleResourceRequirements() { for (RoleStatus role : roleStatusMap.values()) { - role.setResourceRequirements( - buildResourceRequirements(role, recordFactory.newResource())); + role.setResourceRequirements(buildResourceRequirements(role)); } } - - /** - * build up the special master node, which lives - * in the live node set but has a lifecycle bonded to the AM - * @param containerId the AM master - * @param host hostname - * @param amPort port - * @param nodeHttpAddress http address: may be null - */ - public void buildAppMasterNode(ContainerId containerId, - String host, - int amPort, - String nodeHttpAddress) { - Container container = new ContainerPBImpl(); - container.setId(containerId); - NodeId nodeId = NodeId.newInstance(host, amPort); - container.setNodeId(nodeId); - container.setNodeHttpAddress(nodeHttpAddress); - RoleInstance am = new RoleInstance(container); - am.role = SliderKeys.COMPONENT_AM; - am.group = SliderKeys.COMPONENT_AM; - am.roleId = SliderKeys.ROLE_AM_PRIORITY_INDEX; - am.createTime =now(); - am.startTime = am.createTime; - appMasterNode = am; - //it is also added to the set of live nodes - getLiveContainers().put(containerId, am); - putOwnedContainer(containerId, am); - - // patch up the role status - RoleStatus roleStatus = roleStatusMap.get(SliderKeys.ROLE_AM_PRIORITY_INDEX); - roleStatus.setDesired(1); - roleStatus.incActual(); - roleStatus.incStarted(); - } - - /** - * Note that the master node has been launched, - * though it isn't considered live until any forked - * processes are running. It is NOT registered with - * the role history -the container is incomplete - * and it will just cause confusion - */ - public void noteAMLaunched() { - getLiveContainers().put(appMasterNode.getContainerId(), appMasterNode); - } - - /** - * AM declares ourselves live in the cluster description. - * This is meant to be triggered from the callback - * indicating the spawned process is up and running. - */ - public void noteAMLive() { - appMasterNode.state = STATE_LIVE; - } - /** * Look up the status entry of a role or raise an exception * @param key role ID @@ -1008,24 +569,6 @@ public class AppState { return lookupRoleStatus(ContainerPriority.extractRole(c)); } - /** - * Get a deep clone of the role status list. Concurrent events may mean this - * list (or indeed, some of the role status entries) may be inconsistent - * @return a snapshot of the role status entries - */ - public List<RoleStatus> cloneRoleStatusList() { - Collection<RoleStatus> statuses = roleStatusMap.values(); - List<RoleStatus> statusList = new ArrayList<>(statuses.size()); - try { - for (RoleStatus status : statuses) { - statusList.add((RoleStatus)(status.clone())); - } - } catch (CloneNotSupportedException e) { - log.warn("Unexpected cloning failure: {}", e, e); - } - return statusList; - } - /** * Look up a role in the map @@ -1278,8 +821,6 @@ public class AppState { } instance.released = true; containersBeingReleased.put(id, instance.container); - RoleStatus role = lookupRoleStatus(instance.roleId); - role.incReleasing(); roleHistory.onContainerReleaseSubmitted(container); } @@ -1292,10 +833,10 @@ public class AppState { * @return the container request to submit or null if there is none */ private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) { + incPendingContainers(role); if (role.isAntiAffinePlacement()) { return createAAContainerRequest(role); } else { - incrementRequestCount(role); OutstandingRequest request = roleHistory.requestContainerForRole(role); if (request != null) { return request.getIssuedRequest(); @@ -1318,69 +859,69 @@ public class AppState { if (request == null) { return null; } - incrementRequestCount(role); role.setOutstandingAArequest(request); return request.getIssuedRequest(); } - /** - * Increment the request count of a role. - * <p> - * Also updates application state counters - * @param role role being requested. - */ - protected void incrementRequestCount(RoleStatus role) { - role.incRequested(); - incOutstandingContainerRequests(); + private void incPendingContainers(RoleStatus role) { + role.getComponentMetrics().containersPending.incr(); + appMetrics.containersPending.incr(); } - /** - * Inc #of outstanding requests. - */ - private void incOutstandingContainerRequests() { - outstandingContainerRequests.inc(); + private void decPendingContainers(RoleStatus role) { + decPendingContainers(role, 1); } - /** - * Decrement the number of outstanding requests. This never goes below zero. - */ - private void decOutstandingContainerRequests() { - synchronized (outstandingContainerRequests) { - if (outstandingContainerRequests.getCount() > 0) { - // decrement but never go below zero - outstandingContainerRequests.dec(); - } - } + private void decPendingContainers(RoleStatus role, int n) { + role.getComponentMetrics().containersPending.decr(n);; + appMetrics.containersPending.decr(n); } - /** - * Get the value of a YARN requirement (cores, RAM, etc). - * These are returned as integers, but there is special handling of the - * string {@link ResourceKeys#YARN_RESOURCE_MAX}, which triggers - * the return of the maximum value. - * @param group component to get from - * @param option option name - * @param defVal default value - * @param maxVal value to return if the max val is requested - * @return parsed value - * @throws NumberFormatException if the role could not be parsed. - */ - private int getResourceRequirement(ConfTreeOperations resources, - String group, - String option, - int defVal, - int maxVal) { + private void incRunningContainers(RoleStatus role) { + role.getComponentMetrics().containersRunning.incr();; + appMetrics.containersRunning.incr(); + } - String val = resources.getComponentOpt(group, option, - Integer.toString(defVal)); - Integer intVal; - if (YARN_RESOURCE_MAX.equals(val)) { - intVal = maxVal; - } else { - intVal = Integer.decode(val); + private void decRunningContainers(RoleStatus role) { + role.getComponentMetrics().containersRunning.decr();; + appMetrics.containersRunning.decr(); + } + + private void setDesiredContainers(RoleStatus role, int n) { + role.getComponentMetrics().containersDesired.set(n); + appMetrics.containersDesired.set(n); + } + + private void incDesiredContainers(RoleStatus role) { + role.getComponentMetrics().containersDesired.incr(); + appMetrics.containersDesired.incr(); + } + + private void incDesiredContainers(RoleStatus role, int n) { + role.getComponentMetrics().containersDesired.incr(n); + appMetrics.containersDesired.incr(n); + } + + private void incCompletedContainers(RoleStatus role) { + role.getComponentMetrics().containersCompleted.incr(); + appMetrics.containersCompleted.incr(); + } + + private void incFailedContainers(RoleStatus role, ContainerOutcome outcome) { + role.getComponentMetrics().containersFailed.incr(); + appMetrics.containersFailed.incr(); + switch (outcome) { + case Preempted: + appMetrics.containersPreempted.incr(); + role.getComponentMetrics().containersPreempted.incr(); + break; + case Failed: + appMetrics.failedSinceLastThreshold.incr(); + break; + default: + break; } - return intVal; } /** @@ -1388,26 +929,28 @@ public class AppState { * cluster specification, including substituing max allowed values * if the specification asked for it. * @param role role - * @param capability capability to set up. A new one may be created * during normalization */ - public Resource buildResourceRequirements(RoleStatus role, Resource capability) { + public Resource buildResourceRequirements(RoleStatus role) { // Set up resource requirements from role values String name = role.getName(); - String group = role.getGroup(); - ConfTreeOperations resources = getResourcesSnapshot(); - int cores = getResourceRequirement(resources, - group, - YARN_CORES, - DEF_YARN_CORES, - containerMaxCores); - capability.setVirtualCores(cores); - int ram = getResourceRequirement(resources, group, - YARN_MEMORY, - DEF_YARN_MEMORY, - containerMaxMemory); - capability.setMemory(ram); - log.debug("Component {} has RAM={}, vCores ={}", name, ram, cores); + Component component = role.getProviderRole().component; + if (component == null) { + // this is for AM container + // TODO why do we need to create the component for AM ? + return Resource.newInstance(1, 512); + } + int cores = Math.min(containerMaxCores, component.getResource().getCpus()); + if (cores <= 0) { + cores = DEF_YARN_CORES; + } + long mem = Math.min(containerMaxMemory, + Long.parseLong(component.getResource().getMemory())); + if (mem <= 0) { + mem = DEF_YARN_MEMORY; + } + Resource capability = Resource.newInstance(mem, cores); + log.debug("Component {} has RAM={}, vCores ={}", name, mem, cores); Resource normalized = recordFactory.normalize(capability, minResource, maxResource); if (!Resources.equals(normalized, capability)) { @@ -1459,7 +1002,6 @@ public class AppState { */ @VisibleForTesting public RoleInstance innerOnNodeManagerContainerStarted(ContainerId containerId) { - incStartedCountainerCount(); RoleInstance instance = getOwnedContainer(containerId); if (instance == null) { //serious problem @@ -1477,8 +1019,6 @@ public class AppState { "Container "+ containerId +" is already started"); } instance.state = STATE_LIVE; - RoleStatus roleStatus = lookupRoleStatus(instance.roleId); - roleStatus.incStarted(); Container container = instance.container; addLaunchedContainer(container, instance); return instance; @@ -1497,8 +1037,6 @@ public class AppState { public synchronized void onNodeManagerContainerStartFailed(ContainerId containerId, Throwable thrown) { removeOwnedContainer(containerId); - incFailedCountainerCount(); - incStartFailedCountainerCount(); RoleInstance instance = getStartingContainers().remove(containerId); if (null != instance) { RoleStatus roleStatus = lookupRoleStatus(instance.roleId); @@ -1509,9 +1047,10 @@ public class AppState { text = "container start failure"; } instance.diagnostics = text; - roleStatus.noteFailed(true, text, ContainerOutcome.Failed); + roleStatus.noteFailed(text); getFailedContainers().put(containerId, instance); roleHistory.onNodeManagerContainerStartFailed(instance.container); + incFailedContainers(roleStatus, ContainerOutcome.Failed); } } @@ -1607,7 +1146,8 @@ public class AppState { * @param status the node that has just completed * @return NodeCompletionResult */ - public synchronized NodeCompletionResult onCompletedNode(ContainerStatus status) { + public synchronized NodeCompletionResult onCompletedContainer( + ContainerStatus status) { ContainerId containerId = status.getContainerId(); NodeCompletionResult result = new NodeCompletionResult(); RoleInstance roleInstance; @@ -1618,18 +1158,16 @@ public class AppState { log.info("Container was queued for release : {}", containerId); Container container = containersBeingReleased.remove(containerId); RoleStatus roleStatus = lookupRoleStatus(container); - long releasing = roleStatus.decReleasing(); - long actual = roleStatus.decActual(); - long completedCount = roleStatus.incCompleted(); - log.info("decrementing role count for role {} to {}; releasing={}, completed={}", + decRunningContainers(roleStatus); + incCompletedContainers(roleStatus); + log.info("decrementing role count for role {} to {}; completed={}", roleStatus.getName(), - actual, - releasing, - completedCount); + roleStatus.getComponentMetrics().containersRunning.value(), + roleStatus.getComponentMetrics().containersCompleted.value()); result.outcome = ContainerOutcome.Completed; roleHistory.onReleaseCompleted(container); - } else if (surplusNodes.remove(containerId)) { + } else if (surplusContainers.remove(containerId)) { //its a surplus one being purged result.surplusNode = true; } else { @@ -1640,8 +1178,8 @@ public class AppState { roleInstance = removeOwnedContainer(containerId); if (roleInstance != null) { - //it was active, move it to failed - incFailedCountainerCount(); + RoleStatus roleStatus = lookupRoleStatus(roleInstance.roleId); + incFailedContainers(roleStatus, result.outcome); failedContainers.put(containerId, roleInstance); } else { // the container may have been noted as failed already, so look @@ -1653,8 +1191,8 @@ public class AppState { String rolename = roleInstance.role; log.info("Failed container in role[{}] : {}", roleId, rolename); try { - RoleStatus roleStatus = lookupRoleStatus(roleId); - roleStatus.decActual(); + RoleStatus roleStatus = lookupRoleStatus(roleInstance.roleId); + decRunningContainers(roleStatus); boolean shortLived = isShortLived(roleInstance); String message; Container failedContainer = roleInstance.container; @@ -1670,8 +1208,10 @@ public class AppState { } else { message = String.format("Failure %s (%d)", containerId, exitStatus); } - roleStatus.noteFailed(shortLived, message, result.outcome); - long failed = roleStatus.getFailed(); + roleStatus.noteFailed(message); + incFailedContainers(roleStatus, result.outcome); + long failed = + roleStatus.getComponentMetrics().containersFailed.value(); log.info("Current count of failed role[{}] {} = {}", roleId, rolename, failed); if (failedContainer != null) { @@ -1761,7 +1301,7 @@ public class AppState { float actual = 0; for (RoleStatus role : getRoleStatusMap().values()) { desired += role.getDesired(); - actual += role.getActual(); + actual += role.getRunning(); } if (desired == 0) { percentage = 100; @@ -1771,29 +1311,26 @@ public class AppState { return percentage; } + /** * Update the cluster description with the current application state */ - public ClusterDescription refreshClusterStatus() { - return refreshClusterStatus(null); - } + public synchronized Application refreshClusterStatus() { + + //TODO replace ClusterDescription with Application + related statistics + //TODO build container stats + app.setState(ApplicationState.STARTED); + return app; +/* + return app; - /** - * Update the cluster description with the current application state - * @param providerStatus status from the provider for the cluster info section - */ - public synchronized ClusterDescription refreshClusterStatus(Map<String, String> providerStatus) { ClusterDescription cd = getClusterStatus(); long now = now(); cd.setInfoTime(StatusKeys.INFO_STATUS_TIME_HUMAN, StatusKeys.INFO_STATUS_TIME_MILLIS, now); - if (providerStatus != null) { - for (Map.Entry<String, String> entry : providerStatus.entrySet()) { - cd.setInfo(entry.getKey(), entry.getValue()); - } - } + MapOperations infoOps = new MapOperations("info", cd.info); infoOps.mergeWithoutOverwrite(applicationInfo); SliderUtils.addBuildInfo(infoOps, "status"); @@ -1810,32 +1347,8 @@ public class AppState { cd.status = new HashMap<>(); cd.status.put(ClusterDescriptionKeys.KEY_CLUSTER_LIVE, clusterNodes); - for (RoleStatus role : getRoleStatusMap().values()) { String rolename = role.getName(); - if (hasUniqueNames(instanceDefinition.getResourceOperations(), - role.getGroup())) { - cd.setRoleOpt(rolename, COMPONENT_PRIORITY, role.getPriority()); - cd.setRoleOpt(rolename, ROLE_GROUP, role.getGroup()); - MapOperations groupOptions = instanceDefinition.getResourceOperations() - .getComponent(role.getGroup()); - SliderUtils.mergeMapsIgnoreDuplicateKeys(cd.getRole(rolename), - groupOptions.options); - } - String prefix = instanceDefinition.getAppConfOperations() - .getComponentOpt(role.getGroup(), ROLE_PREFIX, null); - if (SliderUtils.isSet(prefix)) { - cd.setRoleOpt(rolename, ROLE_PREFIX, SliderUtils.trimPrefix(prefix)); - } - String dockerImage = instanceDefinition.getAppConfOperations() - .getComponentOpt(role.getGroup(), DOCKER_IMAGE, null); - if (SliderUtils.isSet(dockerImage)) { - cd.setRoleOpt(rolename, DOCKER_IMAGE, dockerImage); - Boolean dockerUsePrivileged = instanceDefinition.getAppConfOperations() - .getComponentOptBool(role.getGroup(), DOCKER_USE_PRIVILEGED, - DEFAULT_DOCKER_USE_PRIVILEGED); - cd.setRoleOpt(rolename, DOCKER_USE_PRIVILEGED, dockerUsePrivileged); - } List<String> instances = instanceMap.get(rolename); int nodeCount = instances != null ? instances.size(): 0; cd.setRoleOpt(rolename, COMPONENT_INSTANCES, @@ -1861,7 +1374,7 @@ public class AppState { // liveness cd.liveness = getApplicationLivenessInformation(); - return cd; + return cd;*/ } /** @@ -1878,29 +1391,6 @@ public class AppState { return li; } - /** - * Get the live statistics map - * @return a map of statistics values, defined in the {@link StatusKeys} - * keylist. - */ - protected Map<String, Integer> getLiveStatistics() { - Map<String, Integer> sliderstats = new HashMap<>(); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE, - liveNodes.size()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED, - completedContainerCount.intValue()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED, - failedContainerCount.intValue()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED, - startedContainers.intValue()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED, - startFailedContainerCount.intValue()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS, - surplusContainers.intValue()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED, - completionOfUnknownContainerEvent.get()); - return sliderstats; - } /** * Get the aggregate statistics across all roles @@ -1949,7 +1439,7 @@ public class AppState { */ public synchronized List<AbstractRMOperation> reviewRequestAndReleaseNodes() throws SliderInternalStateException, TriggerClusterTeardownException { - log.debug("in reviewRequestAndReleaseNodes()"); + log.info("in reviewRequestAndReleaseNodes()"); List<AbstractRMOperation> allOperations = new ArrayList<>(); AbstractRMOperation blacklistOperation = updateBlacklist(); if (blacklistOperation != null) { @@ -1981,15 +1471,11 @@ public class AppState { if (failures > threshold) { throw new TriggerClusterTeardownException( - SliderExitCodes.EXIT_DEPLOYMENT_FAILED, - FinalApplicationStatus.FAILED, ErrorStrings.E_UNSTABLE_CLUSTER + - " - failed with component %s failed 'recently' %d times (%d in startup);" + - " threshold is %d - last failure: %s", - role.getName(), - role.getFailed(), - role.getStartFailed(), - threshold, - role.getFailureMessage()); + SliderExitCodes.EXIT_DEPLOYMENT_FAILED, FinalApplicationStatus.FAILED, + ErrorStrings.E_UNSTABLE_CLUSTER + + " - failed with component %s failed 'recently' %d times;" + + " threshold is %d - last failure: %s", role.getName(), + role.getFailedRecently(), threshold, role.getFailureMessage()); } } @@ -2000,26 +1486,11 @@ public class AppState { * @return the threshold for failures */ private int getFailureThresholdForRole(RoleStatus roleStatus) { - ConfTreeOperations resources = - instanceDefinition.getResourceOperations(); - return resources.getComponentOptInt(roleStatus.getGroup(), - CONTAINER_FAILURE_THRESHOLD, - failureThreshold); + return (int) roleStatus.getProviderRole().component.getConfiguration() + .getPropertyLong(CONTAINER_FAILURE_THRESHOLD, + DEFAULT_CONTAINER_FAILURE_THRESHOLD); } - /** - * Get the node failure threshold for a specific role, falling back to - * the global one if not - * @param roleGroup role group - * @return the threshold for failures - */ - private int getNodeFailureThresholdForRole(String roleGroup) { - ConfTreeOperations resources = - instanceDefinition.getResourceOperations(); - return resources.getComponentOptInt(roleGroup, - NODE_FAILURE_THRESHOLD, - nodeFailureThreshold); - } /** * Reset the "recent" failure counts of all roles @@ -2027,9 +1498,9 @@ public class AppState { public void resetFailureCounts() { for (RoleStatus roleStatus : getRoleStatusMap().values()) { long failed = roleStatus.resetFailedRecently(); - log.info("Resetting failure count of {}; was {}", - roleStatus.getName(), + log.info("Resetting failure count of {}; was {}", roleStatus.getName(), failed); + } roleHistory.resetFailedRecently(); } @@ -2075,6 +1546,7 @@ public class AppState { @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private List<AbstractRMOperation> reviewOneRole(RoleStatus role) throws SliderInternalStateException, TriggerClusterTeardownException { + log.info("review one role " + role.getName()); List<AbstractRMOperation> operations = new ArrayList<>(); long delta; long expected; @@ -2123,7 +1595,8 @@ public class AppState { log.warn("Awaiting node map before generating anti-affinity requests"); } log.info("Setting pending to {}", pending); - role.setPendingAntiAffineRequests(pending); + //TODO + role.setAAPending((int)pending); } else { for (int i = 0; i < delta; i++) { @@ -2139,7 +1612,7 @@ public class AppState { long excess = -delta; // how many requests are outstanding? for AA roles, this includes pending - long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests(); + long outstandingRequests = role.getPending() + role.getAAPending(); if (outstandingRequests > 0) { // outstanding requests. int toCancel = (int)Math.min(outstandingRequests, excess); @@ -2153,8 +1626,7 @@ public class AppState { " expected to be able to cancel {} requests, but got {}", toCancel, cancellations.size()); } - - role.cancel(toCancel); + decPendingContainers(role, toCancel); excess -= toCancel; assert excess >= 0 : "Attempted to cancel too many requests"; log.info("Submitted {} cancellations, leaving {} to release", @@ -2215,9 +1687,9 @@ public class AppState { } else { // actual + requested == desired // there's a special case here: clear all pending AA requests - if (role.getPendingAntiAffineRequests() > 0) { + if (role.getAAPending() > 0) { log.debug("Clearing outstanding pending AA requests"); - role.setPendingAntiAffineRequests(0); + role.setAAPending(0); } } @@ -2269,28 +1741,6 @@ public class AppState { } /** - * Find a container running on a specific host -looking - * into the node ID to determine this. - * - * @param node node - * @param roleId role the container must be in - * @return a container or null if there are no containers on this host - * that can be released. - */ - private RoleInstance findRoleInstanceOnHost(NodeInstance node, int roleId) { - Collection<RoleInstance> targets = cloneOwnedContainerList(); - String hostname = node.hostname; - for (RoleInstance ri : targets) { - if (hostname.equals(RoleHistoryUtils.hostnameOf(ri.container)) - && ri.roleId == roleId - && containersBeingReleased.get(ri.getContainerId()) == null) { - return ri; - } - } - return null; - } - - /** * Release all containers. * @return a list of operations to execute */ @@ -2329,26 +1779,25 @@ public class AppState { * @param assignments the assignments of roles to containers * @param operations any allocation or release operations */ - public synchronized void onContainersAllocated(List<Container> allocatedContainers, - List<ContainerAssignment> assignments, - List<AbstractRMOperation> operations) { - assignments.clear(); - operations.clear(); + public synchronized void onContainersAllocated( + List<Container> allocatedContainers, + List<ContainerAssignment> assignments, + List<AbstractRMOperation> operations) { List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers); - log.debug("onContainersAllocated(): Total containers allocated = {}", ordered.size()); + log.info("onContainersAllocated(): Total containers allocated = {}", ordered.size()); for (Container container : ordered) { final NodeId nodeId = container.getNodeId(); String containerHostInfo = nodeId.getHost() + ":" + nodeId.getPort(); //get the role final ContainerId cid = container.getId(); final RoleStatus role = lookupRoleStatus(container); - - //dec requested count - role.decRequested(); + decPendingContainers(role); //inc allocated count -this may need to be dropped in a moment, // but us needed to update the logic below - final long allocated = role.incActual(); + MutableGaugeInt containersRunning = role.getComponentMetrics().containersRunning; + final long allocated = containersRunning.value(); + incRunningContainers(role); final long desired = role.getDesired(); final String roleName = role.getName(); @@ -2364,22 +1813,12 @@ public class AppState { log.info("Discarding surplus {} container {} on {}", roleName, cid, containerHostInfo); operations.add(new ContainerReleaseOperation(cid)); //register as a surplus node - surplusNodes.add(cid); - surplusContainers.inc(); - //and, as we aren't binding it to role, dec that role's actual count - role.decActual(); + surplusContainers.add(cid); + role.getComponentMetrics().surplusContainers.incr(); + containersRunning.decr(); } else { - - // Allocation being accepted -so decrement the number of outstanding requests - decOutstandingContainerRequests(); - - log.info("Assigning role {} to container" + - " {}," + - " on {}:{},", - roleName, - cid, - nodeId.getHost(), - nodeId.getPort()); + log.info("Assigning role {} to container" + " {}," + " on {}:{},", + roleName, cid, nodeId.getHost(), nodeId.getPort()); assignments.add(new ContainerAssignment(container, role, outcome)); //add to the history @@ -2392,13 +1831,13 @@ public class AppState { if (node.canHost(role.getKey(), role.getLabelExpression())) { log.error("Assigned node still declares as available {}", node.toFullString() ); } - if (role.getPendingAntiAffineRequests() > 0) { + if (role.getAAPending() > 0) { // still an outstanding AA request: need to issue a new one. log.info("Asking for next container for AA role {}", roleName); if (!addContainerRequest(operations, createAAContainerRequest(role))) { log.info("No capacity in cluster for new requests"); } else { - role.decPendingAntiAffineRequests(); + role.decAAPending(); } log.debug("Current AA role status {}", role); } else { @@ -2437,8 +1876,7 @@ public class AppState { for (Container container : liveContainers) { addRestartedContainer(container); } - clusterStatus.setInfo(StatusKeys.INFO_CONTAINERS_AM_RESTART, - Integer.toString(liveContainers.size())); + app.setNumberOfRunningContainers((long)liveContainers.size()); return true; } @@ -2458,10 +1896,9 @@ public class AppState { // get the role int roleId = ContainerPriority.extractRole(container); - RoleStatus role = - lookupRoleStatus(roleId); + RoleStatus role = lookupRoleStatus(roleId); // increment its count - role.incActual(); + incRunningContainers(role); String roleName = role.getName(); log.info("Rebuilding container {} in role {} on {},", @@ -2495,12 +1932,6 @@ public class AppState { final StringBuilder sb = new StringBuilder("AppState{"); sb.append("applicationLive=").append(applicationLive); sb.append(", live nodes=").append(liveNodes.size()); - sb.append(", startedContainers=").append(startedContainers); - sb.append(", startFailedContainerCount=").append(startFailedContainerCount); - sb.append(", surplusContainers=").append(surplusContainers); - sb.append(", failedContainerCount=").append(failedContainerCount); - sb.append(", outstanding non-AA Container Requests=") - .append(outstandingContainerRequests); sb.append('}'); return sb.toString(); }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c1074c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java index a8aa1a2..2dfded8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppStateBindingInfo.java @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.api.resource.Application; import org.apache.slider.providers.ProviderRole; import java.util.ArrayList; @@ -38,26 +38,24 @@ import java.util.Map; * are added. */ public class AppStateBindingInfo { - public AggregateConf instanceDefinition; public Configuration serviceConfig = new Configuration(); - public Configuration publishedProviderConf = new Configuration(false); + public Application application = null; public List<ProviderRole> roles = new ArrayList<>(); public FileSystem fs; public Path historyPath; public List<Container> liveContainers = new ArrayList<>(0); - public Map<String, String> applicationInfo = new HashMap<>(); public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector(); /** node reports off the RM. */ public List<NodeReport> nodeReports = new ArrayList<>(0); public void validate() throws IllegalArgumentException { - Preconditions.checkArgument(instanceDefinition != null, "null instanceDefinition"); Preconditions.checkArgument(serviceConfig != null, "null appmasterConfig"); - Preconditions.checkArgument(publishedProviderConf != null, "null publishedProviderConf"); Preconditions.checkArgument(releaseSelector != null, "null releaseSelector"); Preconditions.checkArgument(roles != null, "null providerRoles"); Preconditions.checkArgument(fs != null, "null fs"); Preconditions.checkArgument(historyPath != null, "null historyDir"); Preconditions.checkArgument(nodeReports != null, "null nodeReports"); + Preconditions.checkArgument(application != null, "null application"); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c1074c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java index 37e9a7f..8046472 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java @@ -21,14 +21,12 @@ package org.apache.slider.server.appmaster.state; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.resource.Application; import org.apache.slider.api.types.ApplicationLivenessInformation; import org.apache.slider.api.types.ComponentInformation; import org.apache.slider.api.types.NodeInformation; import org.apache.slider.api.types.RoleStatistics; -import org.apache.slider.core.conf.AggregateConf; -import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.exceptions.NoSuchNodeException; import org.apache.slider.core.registry.docstore.PublishedConfigSet; import org.apache.slider.core.registry.docstore.PublishedExportsSet; @@ -130,46 +128,16 @@ public class ProviderAppState implements StateAccessForProviders { } @Override - public ClusterDescription getClusterStatus() { + public Application getApplication() { return appState.getClusterStatus(); } @Override - public ConfTreeOperations getResourcesSnapshot() { - return appState.getResourcesSnapshot(); - } - - @Override - public ConfTreeOperations getAppConfSnapshot() { - return appState.getAppConfSnapshot(); - } - - @Override - public ConfTreeOperations getInternalsSnapshot() { - return appState.getInternalsSnapshot(); - } - - @Override public boolean isApplicationLive() { return appState.isApplicationLive(); } @Override - public long getSnapshotTime() { - return appState.getSnapshotTime(); - } - - @Override - public AggregateConf getInstanceDefinitionSnapshot() { - return appState.getInstanceDefinitionSnapshot(); - } - - @Override - public AggregateConf getUnresolvedInstanceDefinition() { - return appState.getUnresolvedInstanceDefinition(); - } - - @Override public RoleStatus lookupRoleStatus(int key) { return appState.lookupRoleStatus(key); } @@ -221,26 +189,16 @@ public class ProviderAppState implements StateAccessForProviders { } @Override - public ClusterDescription refreshClusterStatus() { + public Application refreshClusterStatus() { return appState.refreshClusterStatus(); } @Override - public List<RoleStatus> cloneRoleStatusList() { - return appState.cloneRoleStatusList(); - } - - @Override public ApplicationLivenessInformation getApplicationLivenessInformation() { return appState.getApplicationLivenessInformation(); } @Override - public Map<String, Integer> getLiveStatistics() { - return appState.getLiveStatistics(); - } - - @Override public Map<String, ComponentInformation> getComponentInfoSnapshot() { return appState.getComponentInfoSnapshot(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c1074c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 38c70f3..b6c3675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -135,17 +135,6 @@ public class RoleHistory { outstandingRequests = new OutstandingRequestTracker(); } - /** - * Register all metrics with the metrics infra - * @param metrics metrics - */ - public void register(MetricsAndMonitoring metrics) { - metrics.register(RoleHistory.class, dirty, "dirty"); - metrics.register(RoleHistory.class, nodesUpdatedTime, "nodes-updated.time"); - metrics.register(RoleHistory.class, nodeUpdateReceived, "nodes-updated.flag"); - metrics.register(RoleHistory.class, thawedDataTime, "thawed.time"); - metrics.register(RoleHistory.class, saveTime, "saved.time"); - } /** * safety check: make sure the role is unique amongst @@ -1102,13 +1091,13 @@ public class RoleHistory { int roleId = role.getKey(); List<OutstandingRequest> requests = new ArrayList<>(toCancel); // there may be pending requests which can be cancelled here - long pending = role.getPendingAntiAffineRequests(); + long pending = role.getAAPending(); if (pending > 0) { // there are some pending ones which can be cancelled first long pendingToCancel = Math.min(pending, toCancel); log.info("Cancelling {} pending AA allocations, leaving {}", toCancel, pendingToCancel); - role.setPendingAntiAffineRequests(pending - pendingToCancel); + role.setAAPending(pending - pendingToCancel); toCancel -= pendingToCancel; } if (toCancel > 0 && role.isAARequestOutstanding()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/99c1074c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java index 30cfec9..de52f4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleInstance.java @@ -29,6 +29,7 @@ import org.apache.slider.api.ClusterNode; import org.apache.slider.api.proto.Messages; import org.apache.slider.api.types.ContainerInformation; import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.providers.ProviderRole; import java.util.ArrayList; import java.util.Arrays; @@ -40,6 +41,7 @@ import java.util.List; public final class RoleInstance implements Cloneable { public Container container; + public ProviderRole providerRole; /** * Container ID */ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org