Repository: incubator-slider Updated Branches: refs/heads/develop c9b47f11c -> a2c266126
SLIDER-490: cancel outstanding requests on flex down, as needed Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/a2c26612 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/a2c26612 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/a2c26612 Branch: refs/heads/develop Commit: a2c26612660c8b3a4571e91ecb195253a7e169fa Parents: c9b47f1 Author: Steve Loughran <ste...@apache.org> Authored: Sat Oct 18 22:29:42 2014 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Sat Oct 18 22:35:43 2014 +0100 ---------------------------------------------------------------------- .../providers/AbstractProviderService.java | 10 +- .../server/appmaster/SliderAppMaster.java | 8 +- .../operations/AsyncRMOperationHandler.java | 53 ++++- .../operations/CancelRequestOperation.java | 58 ++++++ .../operations/ContainerRequestOperation.java | 3 +- .../ProviderNotifyingOperationHandler.java | 9 +- .../operations/RMOperationHandler.java | 4 +- .../operations/RMOperationHandlerActions.java | 9 + .../slider/server/appmaster/state/AppState.java | 117 +++++++----- .../appmaster/state/ContainerPriority.java | 37 +++- .../server/appmaster/state/RoleStatus.java | 83 ++++---- .../TestMockAppStateRMOperations.groovy | 191 +++++++++++++++++-- .../model/mock/MockProviderService.groovy | 9 + .../model/mock/MockRMOperationHandler.groovy | 22 ++- .../apache/slider/test/SliderTestUtils.groovy | 37 +++- 15 files changed, 528 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java index 36ee910..c628d8a 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; @@ -332,7 +333,7 @@ public abstract class AbstractProviderService public Map<String, String> buildMonitorDetails(ClusterDescription clusterDesc) { Map<String, String> details = new LinkedHashMap<String, String>(); - // add in all the + // add in all the endpoints buildEndpointDetails(details); return details; @@ -398,6 +399,13 @@ public abstract class AbstractProviderService // no-op } + @Override + public int cancelContainerRequests(Priority priority1, + Priority priority2, + int count) { + return 0; + } + /** * No-op implementation of this method. */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 06d3597..d583a90 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -629,8 +629,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this); addService(asyncRMClient); - //wrap it for the app state model - rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient); //now bring it up deployChildService(asyncRMClient); @@ -707,6 +705,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService containerMaxCores = maxResources.getVirtualCores(); appState.setContainerLimits(maxResources.getMemory(), maxResources.getVirtualCores()); + + // build the handler for RM request/release operations; this uses + // the max value as part of its lookup + rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, + maxResources); + // set the RM-defined maximum cluster values appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(containerMaxCores)); appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(containerMaxMemory)); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java index f7a95a7..1cbb960 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java @@ -19,21 +19,70 @@ package org.apache.slider.server.appmaster.operations; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.List; + /** - * Hands off RM operations to the Resource Manager + * Hands off RM operations to the Resource Manager. */ public class AsyncRMOperationHandler extends RMOperationHandler { protected static final Logger log = LoggerFactory.getLogger(AsyncRMOperationHandler.class); private final AMRMClientAsync client; + private final Resource maxResources; - public AsyncRMOperationHandler(AMRMClientAsync client) { + public AsyncRMOperationHandler(AMRMClientAsync client, Resource maxResources) { this.client = client; + this.maxResources = maxResources; + } + + @Override + public int cancelContainerRequests(Priority priority1, + Priority priority2, + int count) { + // need to revoke a previously issued container request + // so enum the sets and pick some + int remaining = cancelSinglePriorityRequests(priority1, count); + remaining = cancelSinglePriorityRequests(priority2, remaining); + + return remaining; + } + + /** + * Cancel just one of the priority levels + * @param priority priority to cancel + * @param count count to cancel + * @return number of requests cancelled + */ + protected int cancelSinglePriorityRequests(Priority priority, + int count) { + List<Collection<AMRMClient.ContainerRequest>> requestSets = + client.getMatchingRequests(priority, "", maxResources); + if (count <= 0) { + return 0; + } + int remaining = count; + for (Collection<AMRMClient.ContainerRequest> requestSet : requestSets) { + if (remaining == 0) { + break; + } + for (AMRMClient.ContainerRequest request : requestSet) { + if (remaining == 0) { + break; + } + // a single release + client.removeContainerRequest(request); + remaining --; + } + } + return remaining; } @Override http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java new file mode 100644 index 0000000..be5dbab --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.operations; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.slider.server.appmaster.state.ContainerPriority; + +/** + * Cancel a container request + */ +public class CancelRequestOperation extends AbstractRMOperation { + + private final Priority priority1; + private final Priority priority2; + private final int count; + + public CancelRequestOperation(Priority priority1, Priority priority2, int count) { + this.priority1 = priority1; + this.priority2 = priority2; + this.count = count; + } + + @Override + public void execute(RMOperationHandler handler) { + handler.cancelContainerRequests(priority1, priority2, count); + } + + @Override + public String toString() { + return "release " + count + + " requests for " + ContainerPriority.toString(priority1) + + " and " + ContainerPriority.toString(priority2); + } + + /** + * Get the number to release + * @return the number of containers to release + */ + public int getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java index 711bb98..203f898 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java @@ -19,6 +19,7 @@ package org.apache.slider.server.appmaster.operations; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.slider.server.appmaster.state.ContainerPriority; public class ContainerRequestOperation extends AbstractRMOperation { @@ -39,6 +40,6 @@ public class ContainerRequestOperation extends AbstractRMOperation { @Override public String toString() { - return "request container "; + return "request container for " + ContainerPriority.toString(request.getPriority()); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java index a24d9e5..66df566 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java @@ -19,12 +19,13 @@ package org.apache.slider.server.appmaster.operations; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.providers.ProviderService; public class ProviderNotifyingOperationHandler extends RMOperationHandler { - final ProviderService providerService; + private final ProviderService providerService; public ProviderNotifyingOperationHandler(ProviderService providerService) { this.providerService = providerService; @@ -38,6 +39,12 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler { @Override public void addContainerRequest(AMRMClient.ContainerRequest req) { providerService.addContainerRequest(req); + } + @Override + public int cancelContainerRequests(Priority priority1, + Priority priority2, + int count) { + return providerService.cancelContainerRequests(priority1, priority2, count); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java index 2b6e9e2..3ab9d89 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java @@ -18,11 +18,12 @@ package org.apache.slider.server.appmaster.operations; +import org.apache.hadoop.yarn.api.records.Priority; + import java.util.List; public abstract class RMOperationHandler implements RMOperationHandlerActions { - /** * Execute an entire list of operations * @param operations ops @@ -32,4 +33,5 @@ public abstract class RMOperationHandler implements RMOperationHandlerActions { operation.execute(this); } } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java index 6659cc9..e6d6c9d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java @@ -19,10 +19,19 @@ package org.apache.slider.server.appmaster.operations; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.AMRMClient; public interface RMOperationHandlerActions { void releaseAssignedContainer(ContainerId containerId); void addContainerRequest(AMRMClient.ContainerRequest req); + + /** + * Remove a container request + * @param priority1 priority to remove at + * @param priority2 second priority to target + * @param count number to remove + */ + int cancelContainerRequests(Priority priority1, Priority priority2, int count); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 706b0d2..834eaf2 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -58,6 +59,7 @@ import org.apache.slider.core.exceptions.SliderInternalStateException; import org.apache.slider.core.exceptions.TriggerClusterTeardownException; import org.apache.slider.providers.ProviderRole; import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.operations.CancelRequestOperation; import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation; import org.apache.slider.server.appmaster.operations.ContainerRequestOperation; import org.slf4j.Logger; @@ -948,7 +950,7 @@ public class AppState { boolean active) { List<RoleInstance> nodes = new ArrayList<RoleInstance>(); Collection<RoleInstance> allRoleInstances; - allRoleInstances = active? ownedContainers.values() : liveNodes.values(); + allRoleInstances = active ? ownedContainers.values() : liveNodes.values(); for (RoleInstance node : allRoleInstances) { if (node.roleId == roleId) { nodes.add(node); @@ -1574,7 +1576,7 @@ public class AppState { /** * Get the failure threshold for a specific role, falling back to * the global one if not - * @param roleStatus + * @param roleStatus role * @return the threshold for failures */ private int getFailureThresholdForRole(RoleStatus roleStatus) { @@ -1615,11 +1617,10 @@ public class AppState { String name = role.getName(); synchronized (role) { delta = role.getDelta(); - details = role.toString(); expected = role.getDesired(); } - log.info(details); + log.info("Reviewing {}", role); checkFailureThreshold(role); if (delta > 0) { @@ -1630,13 +1631,11 @@ public class AppState { Resource capability = recordFactory.newResource(); AMRMClient.ContainerRequest containerAsk = buildContainerResourceAndRequest(role, capability); - log.info("Container ask is {} and label = {}", containerAsk, containerAsk.getNodeLabelExpression()); - if (containerAsk.getCapability().getMemory() > - this.containerMaxMemory) { - log.warn( - "Memory requested: " + containerAsk.getCapability().getMemory() + - " > " + - this.containerMaxMemory); + log.info("Container ask is {} and label = {}", containerAsk, + containerAsk.getNodeLabelExpression()); + int askMemory = containerAsk.getCapability().getMemory(); + if (askMemory > this.containerMaxMemory) { + log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory); } operations.add(new ContainerRequestOperation(containerAsk)); } @@ -1649,47 +1648,73 @@ public class AppState { //then pick some containers to kill int excess = -delta; - // get the nodes to release - int roleId = role.getKey(); - - // enum all active nodes that aren't being released - List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true); - - // cut all release-in-progress nodes - ListIterator<RoleInstance> li = containersToRelease.listIterator(); - while (li.hasNext()) { - RoleInstance next = li.next(); - if (next.released) { - li.remove(); + // how many requests are outstanding + int outstandingRequests = role.getRequested(); + if (outstandingRequests > 0) { + // outstanding requests. + int toCancel = Math.min(outstandingRequests, excess); + Priority p1 = + ContainerPriority.createPriority(role.getPriority(), true); + Priority p2 = + ContainerPriority.createPriority(role.getPriority(), false); + operations.add(new CancelRequestOperation(p1, p2, toCancel)); + role.cancel(toCancel); + excess -= toCancel; + assert excess >= 0 : "Attempted to cancel too many requests"; + log.info("Submitted {} cancellations, leaving {} to release", + toCancel, excess); + if (excess == 0) { + log.info("After cancelling requests, application is at desired size"); } } - // warn if the desired state can't be reaced - if (containersToRelease.size() < excess) { - log.warn("Not enough nodes to release...short of {} nodes", - containersToRelease.size() - excess); - } - - // ask the release selector to sort the targets - containersToRelease = containerReleaseSelector.sortCandidates( - roleId, - containersToRelease, - excess); - - //crop to the excess - List<RoleInstance> finalCandidates = (excess < containersToRelease.size()) - ? containersToRelease.subList(0, excess) - : containersToRelease; - + // after the cancellation there may be no excess + if (excess > 0) { + // get the nodes to release + int roleId = role.getKey(); + + // enum all active nodes that aren't being released + List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true); + + // cut all release-in-progress nodes + ListIterator<RoleInstance> li = containersToRelease.listIterator(); + while (li.hasNext()) { + RoleInstance next = li.next(); + if (next.released) { + li.remove(); + } + } - // then build up a release operation, logging each container as released - for (RoleInstance possible : finalCandidates) { - log.debug("Targeting for release: {}", possible); - containerReleaseSubmitted(possible.container); - operations.add(new ContainerReleaseOperation(possible.getId())); + // warn if the desired state can't be reaced + int numberAvailableForRelease = containersToRelease.size(); + if (numberAvailableForRelease < excess) { + log.warn("Not enough nodes to release, have {} and need {} more", + numberAvailableForRelease, + excess - numberAvailableForRelease); + } + + // ask the release selector to sort the targets + containersToRelease = containerReleaseSelector.sortCandidates( + roleId, + containersToRelease, + excess); + + //crop to the excess + + List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease) + ? containersToRelease.subList(0, excess) + : containersToRelease; + + + // then build up a release operation, logging each container as released + for (RoleInstance possible : finalCandidates) { + log.debug("Targeting for release: {}", possible); + containerReleaseSubmitted(possible.container); + operations.add(new ContainerReleaseOperation(possible.getId())); + } } - + } return operations; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java index 369a932..3cc2106 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java @@ -18,10 +18,13 @@ package org.apache.slider.server.appmaster.state; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.util.Records; +import java.util.Locale; + /** * Class containing the logic to build/split container priorities into the * different fields used by Slider @@ -61,23 +64,47 @@ public final class ContainerPriority { } /** + * Does the priority have location + * @param priority priority index + * @return true if the priority has the location marker + */ + public static boolean hasLocation(int priority) { + return (priority ^ NOLOCATION ) != 0; + } + + /** * Map from a container to a role key by way of its priority * @param container container * @return role key */ public static int extractRole(Container container) { Priority priority = container.getPriority(); - assert priority != null; - return extractRole(priority.getPriority()); + return extractRole(priority); } /** - * Map from a container to a role key by way of its priority - * @param container container - * @return role key + * Priority record to role mapper + * @param priorityRecord priority record + * @return the role # */ public static int extractRole(Priority priorityRecord) { + Preconditions.checkNotNull(priorityRecord); return extractRole(priorityRecord.getPriority()); } + /** + * Convert a priority record to a string, extracting role and locality + * @param priorityRecord priority record. May be null + * @return a string value + */ + public static String toString(Priority priorityRecord) { + if (priorityRecord==null) { + return "(null)"; + } else { + return String.format(Locale.ENGLISH, + "role %d (locality=%b)", + extractRole(priorityRecord), + hasLocation(priorityRecord.getPriority())); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index a112799..63b5931 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -32,10 +32,8 @@ import java.util.Map; */ public final class RoleStatus implements Cloneable { - private final String name; - /** * Role key in the container details stored in the AM, * currently mapped to priority @@ -45,7 +43,7 @@ public final class RoleStatus implements Cloneable { private final ProviderRole providerRole; private int desired, actual, requested, releasing; - private volatile int failed, started, startFailed, completed, totalRequested; + private int failed, started, startFailed, completed, totalRequested; /** * value to use when specifiying "no limit" for instances: {@value} @@ -71,7 +69,7 @@ public final class RoleStatus implements Cloneable { this.name = providerRole.name; this.key = providerRole.id; } - + public String getName() { return name; } @@ -101,26 +99,24 @@ public final class RoleStatus implements Cloneable { return 0 != (getPlacementPolicy() & PlacementPolicy.NO_DATA_LOCALITY); } - public int getDesired() { + public synchronized int getDesired() { return desired; } - public void setDesired(int desired) { + public synchronized void setDesired(int desired) { this.desired = desired; } - public int getActual() { + public synchronized int getActual() { return actual; } - public int incActual() { + public synchronized int incActual() { return ++actual; } - public int decActual() { - if (0 > --actual) { - actual = 0; - } + public synchronized int decActual() { + actual = Math.max(0, actual - 1); return actual; } @@ -133,29 +129,29 @@ public final class RoleStatus implements Cloneable { return ++requested; } - public synchronized int decRequested() { - if (0 > --requested) { - requested = 0; - } + public synchronized int cancel(int count) { + requested = Math.max(0, requested - count); return requested; } + + public synchronized int decRequested() { + return cancel(1); + } - public int getReleasing() { + public synchronized int getReleasing() { return releasing; } - public int incReleasing() { + public synchronized int incReleasing() { return ++releasing; } - public int decReleasing() { - if (0 > --releasing) { - releasing = 0; - } + public synchronized int decReleasing() { + releasing = Math.max(0, releasing - 1); return releasing; } - public int getFailed() { + public synchronized int getFailed() { return failed; } @@ -163,7 +159,7 @@ public final class RoleStatus implements Cloneable { * Reset the failure counts * @return the total number of failures up to this point */ - public int resetFailed() { + public synchronized int resetFailed() { int total = failed + startFailed; failed = 0; startFailed = 0; @@ -178,7 +174,7 @@ public final class RoleStatus implements Cloneable { * @return the number of failures * @param text text about the failure */ - public int noteFailed(boolean startupFailure, String text) { + public synchronized int noteFailed(boolean startupFailure, String text) { int current = ++failed; if (text != null) { failureMessage = text; @@ -190,38 +186,38 @@ public final class RoleStatus implements Cloneable { return current; } - public int getStartFailed() { + public synchronized int getStartFailed() { return startFailed; } - public void incStartFailed() { + public synchronized void incStartFailed() { startFailed++; } - public String getFailureMessage() { + public synchronized String getFailureMessage() { return failureMessage; } - public int getCompleted() { + public synchronized int getCompleted() { return completed; } - public void setCompleted(int completed) { + public synchronized void setCompleted(int completed) { this.completed = completed; } - public int incCompleted() { + public synchronized int incCompleted() { return completed ++; } - public int getStarted() { + public synchronized int getStarted() { return started; } - public void incStarted() { + public synchronized void incStarted() { started++; } - public int getTotalRequested() { + public synchronized int getTotalRequested() { return totalRequested; } @@ -233,9 +229,8 @@ public final class RoleStatus implements Cloneable { * 0 means "do nothing". */ public synchronized int getDelta() { - int inuse = actual + requested; + int inuse = getActualAndRequested(); //don't know how to view these. Are they in-use or not? - //inuse += releasing; int delta = desired - inuse; if (delta < 0) { //if we are releasing, remove the number that are already released. @@ -246,8 +241,16 @@ public final class RoleStatus implements Cloneable { return delta; } + /** + * Get count of actual and requested containers + * @return the size of the application when outstanding requests are included + */ + public synchronized int getActualAndRequested() { + return actual + requested; + } + @Override - public String toString() { + public synchronized String toString() { return "RoleStatus{" + "name='" + name + '\'' + ", key=" + key + @@ -267,13 +270,13 @@ public final class RoleStatus implements Cloneable { } @Override - public Object clone() throws CloneNotSupportedException { + public synchronized Object clone() throws CloneNotSupportedException { return super.clone(); } /** * Get the provider role - * @return + * @return the provider role */ public ProviderRole getProviderRole() { return providerRole; @@ -283,7 +286,7 @@ public final class RoleStatus implements Cloneable { * Build the statistics map from the current data * @return a map for use in statistics reports */ - public Map<String, Integer> buildStatistics() { + public synchronized Map<String, Integer> buildStatistics() { Map<String, Integer> stats = new HashMap<String, Integer>(); stats.put(StatusKeys.STATISTICS_CONTAINERS_ACTIVE_REQUESTS, getRequested()); stats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED, getCompleted()); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy index f8e852e..e5ad4ae 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy @@ -25,7 +25,9 @@ import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.model.mock.MockRMOperationHandler import org.apache.slider.server.appmaster.model.mock.MockRoles +import org.apache.slider.server.appmaster.model.mock.MockYarnEngine import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.operations.CancelRequestOperation import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.operations.RMOperationHandler @@ -63,32 +65,181 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR public void testMockAddOp() throws Throwable { role0Status.desired = 1 List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() - assert ops.size() == 1 + assertListLength(ops, 1) ContainerRequestOperation operation = (ContainerRequestOperation) ops[0] int priority = operation.request.priority.priority assert extractRole(priority) == MockFactory.PROVIDER_ROLE0.id RMOperationHandler handler = new MockRMOperationHandler() handler.execute(ops) - //tell the container its been allocated AbstractRMOperation op = handler.operations[0] assert op instanceof ContainerRequestOperation } + /** + * Test of a flex up and down op which verifies that outstanding + * requests are cancelled first. + * <ol> + * <li>request 5 nodes, assert 5 request made</li> + * <li>allocate 1 of them</li> + * <li>flex cluster size to 3</li> + * <li>assert this generates 2 cancel requests</li> + * </ol> + */ + @Test + public void testRequestThenCancelOps() throws Throwable { + def role0 = role0Status + role0.desired = 5 + List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 5) + // now 5 outstanding requests. + assert role0.requested == 5 + + // allocate one + role0.incActual() + role0.decRequested() + assert role0.requested == 4 + + + // flex cluster to 3 + role0.desired = 3 + ops = appState.reviewRequestAndReleaseNodes() + + // expect a cancel operation from review + assertListLength(ops, 1) + assert ops[0] instanceof CancelRequestOperation + RMOperationHandler handler = new MockRMOperationHandler() + handler.availableToCancel = 4; + handler.execute(ops) + assert handler.availableToCancel == 2 + assert role0.requested == 2 + + // flex down one more + role0.desired = 2 + ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 1) + assert ops[0] instanceof CancelRequestOperation + handler.execute(ops) + assert handler.availableToCancel == 1 + assert role0.requested == 1 + + } + + @Test + public void testCancelNoActualContainers() throws Throwable { + def role0 = role0Status + role0.desired = 5 + List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 5) + // now 5 outstanding requests. + assert role0.requested == 5 + role0.desired = 0 + ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 1) + CancelRequestOperation cancel = ops[0] as CancelRequestOperation + assert cancel.count == 5 + } + + + @Test + public void testFlexDownOutstandingRequests() throws Throwable { + // engine only has two nodes, so > 2 will be outstanding + engine = new MockYarnEngine(1, 2) + List<AbstractRMOperation> ops + // role: desired = 2, requested = 1, actual=1 + def role0 = role0Status + role0.desired = 4 + createAndSubmitNodes() + + assert role0.requested == 2 + assert role0.actual == 2 + // there are now two outstanding, two actual + // Release 3 and verify that the two + // cancellations were combined with a release + role0.desired = 1; + assert role0.delta == -3 + ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 2) + assert role0.requested == 0 + assert role0.releasing == 1 + } + + @Test + public void testCancelAllOutstandingRequests() throws Throwable { + + // role: desired = 2, requested = 1, actual=1 + def role0 = role0Status + role0.desired = 2 + role0.incRequested() + role0.incRequested() + List<AbstractRMOperation> ops + + // there are now two outstanding, two actual + // Release 3 and verify that the two + // cancellations were combined with a release + role0.desired = 0; + ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 1) + CancelRequestOperation cancel = ops[0] as CancelRequestOperation + assert cancel.getCount() == 2 + } + + + @Test + public void testFlexUpOutstandingRequests() throws Throwable { + + // role: desired = 2, requested = 1, actual=1 + def role0 = role0Status + role0.desired = 2 + role0.incActual(); + role0.incRequested() + + List<AbstractRMOperation> ops + + // flex up 2 nodes, yet expect only one node to be requested, + // as the outstanding request is taken into account + role0.desired = 4; + role0.incRequested() + + assert role0.actual == 1; + assert role0.requested == 2; + assert role0.actualAndRequested == 3; + assert role0.delta == 1 + ops = appState.reviewRequestAndReleaseNodes() + assertListLength(ops, 1) + assert ops[0] instanceof ContainerRequestOperation + assert role0.requested == 3 + } + + @Test + public void testFlexUpNoSpace() throws Throwable { + // engine only has two nodes, so > 2 will be outstanding + engine = new MockYarnEngine(1, 2) + List<AbstractRMOperation> ops + // role: desired = 2, requested = 1, actual=1 + def role0 = role0Status + role0.desired = 4 + createAndSubmitNodes() + + assert role0.requested == 2 + assert role0.actual == 2 + role0.desired = 8; + assert role0.delta == 4 + createAndSubmitNodes() + assert role0.requested == 6 + } + + @Test public void testAllocateReleaseOp() throws Throwable { role0Status.desired = 1 List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() ContainerRequestOperation operation = (ContainerRequestOperation) ops[0] - AMRMClient.ContainerRequest request = operation.request - Container cont = engine.allocateContainer(request) - List<Container> allocated = [cont] - List<ContainerAssignment> assignments = []; - List<AbstractRMOperation> operations = [] - appState.onContainersAllocated(allocated, assignments, operations) - assert operations.size() == 0 - assert assignments.size() == 1 + def (ArrayList<ContainerAssignment> assignments, Container cont, AMRMClient.ContainerRequest request) = satisfyContainerRequest( + operation) + assertListLength(ops, 1) + assertListLength(assignments, 1) ContainerAssignment assigned = assignments[0] Container target = assigned.container assert target.id == cont.id @@ -104,13 +255,23 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR //now release it by changing the role status role0Status.desired = 0 ops = appState.reviewRequestAndReleaseNodes() - assert ops.size() == 1 + assertListLength(ops, 1) assert ops[0] instanceof ContainerReleaseOperation ContainerReleaseOperation release = (ContainerReleaseOperation) ops[0] assert release.containerId == cont.id } + public List satisfyContainerRequest(ContainerRequestOperation operation) { + AMRMClient.ContainerRequest request = operation.request + Container cont = engine.allocateContainer(request) + List<Container> allocated = [cont] + List<ContainerAssignment> assignments = []; + List<AbstractRMOperation> operations = [] + appState.onContainersAllocated(allocated, assignments, operations) + return [assignments, cont, request] + } + @Test public void testComplexAllocation() throws Throwable { role0Status.desired = 1 @@ -121,8 +282,8 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR List<ContainerAssignment> assignments = []; List<AbstractRMOperation> releases = [] appState.onContainersAllocated(allocations, assignments, releases) - assert releases.size() == 0 - assert assignments.size() == 4 + assertListLength(releases, 0) + assertListLength(assignments, 4) assignments.each { ContainerAssignment assigned -> Container target = assigned.container RoleInstance ri = roleInstance(assigned) @@ -136,7 +297,7 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR assert engine.containerCount() == 4; role1Status.desired = 0 ops = appState.reviewRequestAndReleaseNodes() - assert ops.size() == 3 + assertListLength(ops, 3) allocations = engine.execute(ops) assert engine.containerCount() == 1; @@ -154,7 +315,7 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR List<ContainerAssignment> assignments = []; List<AbstractRMOperation> releases = [] appState.onContainersAllocated(allocations, assignments, releases) - assert assignments.size() == 1 + assertListLength(assignments, 1) ContainerAssignment assigned = assignments[0] Container target = assigned.container RoleInstance ri = roleInstance(assigned) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy index 675aec5..fc23d54 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy @@ -24,6 +24,7 @@ import org.apache.hadoop.service.LifecycleEvent import org.apache.hadoop.service.ServiceStateChangeListener import org.apache.hadoop.yarn.api.records.Container import org.apache.hadoop.yarn.api.records.ContainerId +import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.registry.client.types.ServiceRecord import org.apache.slider.api.ClusterDescription @@ -269,6 +270,14 @@ class MockProviderService implements ProviderService { } @Override + int cancelContainerRequests( + Priority priority1, + Priority priority2, + int count) { + return 0 + } + + @Override void rebuildContainerDetails(List<Container> liveContainers, String applicationId, Map<Integer, ProviderRole> roleProviderMap) { } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy index 0fdba6b..297c597 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy @@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.model.mock import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.ContainerId +import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.server.appmaster.operations.AbstractRMOperation import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation @@ -30,6 +31,10 @@ import org.apache.slider.server.appmaster.operations.RMOperationHandler class MockRMOperationHandler extends RMOperationHandler { public List<AbstractRMOperation> operations = []; int requests, releases; + // number available to cancel + int availableToCancel = 0; + // count of cancelled values. This must be explicitly set + int cancelled @Override public void releaseAssignedContainer(ContainerId containerId) { @@ -45,9 +50,20 @@ class MockRMOperationHandler extends RMOperationHandler { requests++; } - /** - * clear the history - */ + @Override + int cancelContainerRequests( + Priority priority1, + Priority priority2, + int count) { + int releaseable = Math.min(count, availableToCancel) + availableToCancel -= releaseable; + cancelled += releaseable; + return releaseable; + } + +/** + * clear the history + */ public void clear() { operations.clear() releases = 0; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy index 61dfeb5..42ec9de 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy @@ -85,19 +85,46 @@ class SliderTestUtils extends Assert { public static void assume(boolean condition, String message) { if (!condition) { - log.warn("Skipping test: {}", message) - Assume.assumeTrue(message, false); + skip(message) } } - + /** + * Equality size for a list + * @param left + * @param right + */ public static void assertListEquals(List left, List right) { - assert left.size() == right.size(); + String lval = collectionToString(left) + String rval = collectionToString(right) + String text = "comparing $lval to $rval" + assertEquals(text, left.size(), right.size()) for (int i = 0; i < left.size(); i++) { - assert left[0] == right[0] + assertEquals(text, left[i], right[i]) } } + + /** + * Assert a list has a given length + * @param list list + * @param size size to have + */ + public static void assertListLength(List list, int size) { + String lval = collectionToString(list) + assertEquals(lval, size, list.size()) + } + + + /** + * Stringify a collection with [ ] at either end + * @param collection collection + * @return string value + */ + public static String collectionToString(List collection) { + return "[" + SliderUtils.join(collection,", ", false) +"]" + } + /** * Assume that a string option is set and not equal to "" * @param conf configuration file