Repository: incubator-slider Updated Branches: refs/heads/develop 49e288137 -> fe6a72062
SLIDER-77 failure window resetting; docs needed to go with this Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/9a474833 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/9a474833 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/9a474833 Branch: refs/heads/develop Commit: 9a4748339cac3e5c19d97a9417621b7a874a0397 Parents: cebde98 Author: Steve Loughran <ste...@apache.org> Authored: Mon Aug 11 17:55:42 2014 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Mon Aug 11 17:55:42 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/slider/api/OptionKeys.java | 23 --- .../org/apache/slider/api/ResourceKeys.java | 82 ++++++++++ .../org/apache/slider/client/SliderClient.java | 3 - .../apache/slider/common/SliderXmlConfKeys.java | 13 -- .../server/appmaster/SliderAppMaster.java | 88 ++++++++--- .../server/appmaster/actions/ActionHalt.java | 7 +- .../appmaster/actions/ActionStartContainer.java | 7 +- .../appmaster/actions/ActionStopQueue.java | 15 +- .../appmaster/actions/ActionStopSlider.java | 15 +- .../server/appmaster/actions/AsyncAction.java | 16 +- .../actions/ProviderReportedContainerLoss.java | 7 +- .../actions/ProviderStartupCompleted.java | 7 +- .../server/appmaster/actions/QueueAccess.java | 31 +++- .../server/appmaster/actions/QueueExecutor.java | 2 +- .../server/appmaster/actions/QueueService.java | 107 +++++++++++-- .../appmaster/actions/RenewingAction.java | 35 +++-- .../appmaster/actions/ResetFailureWindow.java | 39 +++++ .../slider/server/appmaster/state/AppState.java | 66 ++++---- .../server/appmaster/state/RoleHistory.java | 2 +- .../server/appmaster/state/RoleStatus.java | 20 ++- .../workflow/WorkflowExecutorService.java | 2 +- .../server/appmaster/actions/TestActions.groovy | 40 +++-- .../TestMockAppStateContainerFailure.groovy | 44 +++++- .../model/mock/BaseMockAppStateTest.groovy | 14 +- .../failures/TestFailureThreshold.groovy | 150 ------------------- .../TestRegionServerFailureThreshold.groovy | 149 ++++++++++++++++++ 26 files changed, 687 insertions(+), 297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java b/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java index 048fefa..006e280 100644 --- a/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/OptionKeys.java @@ -69,29 +69,6 @@ public interface OptionKeys { String APPLICATION_NAME = "application.name"; /** - * Time in milliseconds before a container is considered long-lived. - * Shortlived containers are interpreted as a problem with the role - * and/or the host: {@value} - */ - String INTERNAL_CONTAINER_FAILURE_SHORTLIFE = "internal.container.failure.shortlife"; - - /** - * Default short life threshold: {@value} - */ - int DEFAULT_CONTAINER_FAILURE_SHORTLIFE = 60; - - /** - * maximum number of failed containers (in a single role) - * before the cluster is deemed to have failed {@value} - */ - String INTERNAL_CONTAINER_FAILURE_THRESHOLD = "internal.container.failure.threshold"; - - /** - * Default failure threshold: {@value} - */ - int DEFAULT_CONTAINER_FAILURE_THRESHOLD = 5; - - /** * delay for container startup:{@value} */ int DEFAULT_CONTAINER_STARTUP_DELAY = 5000; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java index 1c914cb..c2b78cc 100644 --- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java @@ -20,6 +20,25 @@ package org.apache.slider.api; /** * These are the keys valid in resource options + * + /* + + Container failure window. + + The window is calculated in minutes as as (days * 24 *60 + hours* 24 + minutes) + + Every interval of this period after the AM is started/restarted becomes + the time period in which the CONTAINER_FAILURE_THRESHOLD value is calculated. + + After the window limit is reached, the failure counts are reset. This + is not a sliding window/moving average policy, simply a rule such as + "every six hours the failure count is reset" + + + <pre> + =========================================================================== + </pre> + */ public interface ResourceKeys { @@ -69,4 +88,67 @@ public interface ResourceKeys { * placement policy */ String COMPONENT_PLACEMENT_POLICY = "yarn.component.placement.policy"; + + + + /** + * Time in seconds before a container is considered long-lived. + * Shortlived containers are interpreted as a problem with the role + * and/or the host: {@value} + */ + String CONTAINER_FAILURE_SHORTLIFE = + "container.failure.shortlife"; + + /** + * Default short life threshold: {@value} + */ + int DEFAULT_CONTAINER_FAILURE_SHORTLIFE = 60; + + /** + * maximum number of failed containers (in a single role) + * before the cluster is deemed to have failed {@value} + */ + String CONTAINER_FAILURE_THRESHOLD = + "yarn.container.failure.threshold"; + + /** + * Minutes range of the container failure reset window. + * This is combined with the hour and day values to + * produce the full window. + * {@value} + */ + String CONTAINER_FAILURE_WINDOW_MINUTES = + "yarn.container.failure.window.minutes"; + + + + /** + * Hours range of the container failure reset window + * This is combined with the minute and day values to + * produce the full window. + * {@value} + */ + String CONTAINER_FAILURE_WINDOW_HOURS = + "yarn.container.failure.window.hours"; + + + /** + * Hours range of the container failure reset window + * This is combined with the hour and minute values to + * produce the full window. + * {@value} + */ + String CONTAINER_FAILURE_WINDOW_DAYS = + "yarn.container.failure.window.days"; + + int DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS = 0; + int DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS = 6; + int DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES = 0; + + + /** + * Default failure threshold: {@value} + */ + int DEFAULT_CONTAINER_FAILURE_THRESHOLD = 5; + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java index e1d083c..677caae 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -753,9 +753,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe return instanceDefinition; } - - - /** * http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java index 3f16f25..1bbe9ae 100644 --- a/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java +++ b/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java @@ -84,19 +84,6 @@ public interface SliderXmlConfKeys { String KEY_AM_RESTART_LIMIT = "slider.yarn.restart.limit"; /** - * Default Limit on restarts for the AM - * {@value} - */ - int DEFAULT_AM_RESTART_LIMIT = 2; - - /** - * Flag which is set to indicate that security should be enabled - * when talking to this cluster. - */ - String KEY_SECURITY = - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; - - /** * queue name */ String KEY_YARN_QUEUE = "slider.yarn.queue"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 52f7f8f..742a062 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -75,6 +75,7 @@ import org.apache.slider.common.tools.SliderVersionInfo; import org.apache.slider.core.build.InstanceIO; import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.core.exceptions.SliderException; @@ -100,6 +101,8 @@ import org.apache.slider.server.appmaster.actions.ActionHalt; import org.apache.slider.server.appmaster.actions.QueueService; import org.apache.slider.server.appmaster.actions.ActionStopSlider; import org.apache.slider.server.appmaster.actions.AsyncAction; +import org.apache.slider.server.appmaster.actions.RenewingAction; +import org.apache.slider.server.appmaster.actions.ResetFailureWindow; import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler; import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler; import org.apache.slider.server.appmaster.rpc.RpcBinder; @@ -150,6 +153,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -330,11 +334,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private String agentAccessUrl; private CertificateManager certificateManager; - private WorkflowScheduledExecutorService<ScheduledExecutorService> - scheduledExecutors; - private WorkflowExecutorService<ExecutorService> - executorService; - + private WorkflowExecutorService<ExecutorService> executorService; private final QueueService actionQueues = new QueueService(); @@ -389,14 +389,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //look at settings of Hadoop Auth, to pick up a problem seen once checkAndWarnForAuthTokenProblems(); - scheduledExecutors = - new WorkflowScheduledExecutorService<>( - "AmExecutor", - Executors.newScheduledThreadPool( - SCHEDULED_EXECUTOR_POOL_SIZE, - new ServiceThreadFactory("AmScheduledExecutor", true))); - addService(scheduledExecutors); - executorService = new WorkflowExecutorService<>("AmExecutor", Executors.newCachedThreadPool( new ServiceThreadFactory("AmExecutor", true))); @@ -938,6 +930,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** + * Get the application state + * @return the application state + */ + public AppState getAppState() { + return appState; + } + + /** * Block until it is signalled that the AM is done */ private void waitForAMCompletionSignal() { @@ -1000,7 +1000,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //stop any launches in progress launchService.stop(); - //now release all containers releaseAllContainers(); @@ -1008,8 +1007,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // signal to the RM log.info("Application completed. Signalling finish to RM"); - //if there were failed containers and the app isn't already down as failing, it is now +/* int failedContainerCount = appState.getFailedCountainerCount(); if (failedContainerCount != 0 && appStatus == FinalApplicationStatus.SUCCEEDED) { @@ -1018,6 +1017,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService "Completed with exit code = " + exitCode + " - " + getContainerDiagnosticInfo(); success = false; } +*/ try { log.info("Unregistering AM status={} message={}", appStatus, appMessage); asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null); @@ -1140,19 +1140,63 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * Implementation of cluster flexing. * It should be the only way that anything -even the AM itself on startup- * asks for nodes. + * @param resources the resource tree * @return true if the any requests were made * @throws IOException */ - private boolean flexCluster(ConfTree updated) + private boolean flexCluster(ConfTree resources) throws IOException, SliderInternalStateException, BadConfigException { - appState.updateResourceDefinitions(updated); + appState.updateResourceDefinitions(resources); + appState.resetFailureCounts(); + // reset the scheduled window resetter...the values + // may have changed + + // ask for more containers if needed return reviewRequestAndReleaseNodes(); } /** + * Schedule the failure window + * @param resources + * @throws BadConfigException if the window is out of range + */ + private void scheduleFailureWindowResets(ConfTree resources) throws + BadConfigException { + ResetFailureWindow reset = new ResetFailureWindow(); + ConfTreeOperations ops = new ConfTreeOperations(resources); + MapOperations globals = ops.getGlobalOptions(); + int days = globals.getOptionInt(ResourceKeys.CONTAINER_FAILURE_WINDOW_DAYS, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS); + int hours = globals.getOptionInt( + ResourceKeys.CONTAINER_FAILURE_WINDOW_HOURS, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS); + int minutes = globals.getOptionInt( + ResourceKeys.CONTAINER_FAILURE_WINDOW_MINUTES, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES); + // range check + if (days < 0 || hours < 0 || minutes < 0) { + throw new BadConfigException("Failure window contains negative values:" + + "days=%d hours=%d, minutes=%d", + days, hours, minutes); + } + // calculate total time, schedule the reset if expected + int totalMinutes = days * 24 * 60 + hours * 24 + minutes; + if (totalMinutes > 0) { + log.info( + "Scheduling the failure window reset interval to every {} minutes", + totalMinutes); + RenewingAction<ResetFailureWindow> renew = new RenewingAction<>( + reset, totalMinutes, totalMinutes, TimeUnit.MINUTES, 0); + actionQueues.renewing("failures", renew); + } else { + log.warn("Failure window reset interval is not set"); + } + } + + /** * Look at where the current node state is -and whether it should be changed */ private synchronized boolean reviewRequestAndReleaseNodes() @@ -1255,7 +1299,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService SliderUtils.getCurrentUser(); String message = request.getMessage(); log.info("SliderAppMasterApi.stopCluster: {}", message); - queue(new ActionStopSlider(message, 1000)); + schedule(new ActionStopSlider(message, 1000, TimeUnit.MILLISECONDS)); return Messages.StopClusterResponseProto.getDefaultInstance(); } @@ -1267,8 +1311,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService String payload = request.getClusterSpec(); ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser(); - ConfTree updated = confTreeSerDeser.fromJson(payload); - boolean flexed = flexCluster(updated); + ConfTree updatedResources = confTreeSerDeser.fromJson(payload); + boolean flexed = flexCluster(updatedResources); return Messages.FlexClusterResponseProto.newBuilder().setResponse(flexed).build(); } @@ -1657,7 +1701,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * @param action action to execute */ public void queue(AsyncAction action) { - actionQueues.actionQueue.add(action); + actionQueues.put(action); } /** @@ -1665,7 +1709,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * @param action for delayed execution */ public void schedule(AsyncAction action) { - actionQueues.delayedActions.add(action); + actionQueues.schedule(action); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java index 44337fc..b46a791 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java @@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.hadoop.util.ExitUtil; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; /** * Exit a JVM halt. @@ -33,14 +34,16 @@ public class ActionHalt extends AsyncAction { public ActionHalt( int status, String text, - int delay) { + long delay) { super("Halt", delay, ActionAttributes.HALTS_CLUSTER); this.status = status; this.text = text; } @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { ExitUtil.halt(status, text); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java index 1a86e5c..5efbbe3 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java @@ -21,6 +21,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; import org.apache.slider.server.appmaster.state.RoleInstance; /** @@ -34,7 +35,7 @@ public class ActionStartContainer extends AsyncAction { private final RoleInstance instance; public ActionStartContainer(String name, - int delay, + long delay, Container container, ContainerLaunchContext ctx, RoleInstance instance) { @@ -45,7 +46,9 @@ public class ActionStartContainer extends AsyncAction { } @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { appMaster.startContainer(container, ctx, instance); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java index 7a31f12..66a3961 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java @@ -19,6 +19,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; import java.util.concurrent.TimeUnit; @@ -27,17 +28,25 @@ import java.util.concurrent.TimeUnit; */ public class ActionStopQueue extends AsyncAction { - public ActionStopQueue(int delay) { + public ActionStopQueue(long delay) { super("stop queue", delay); } - public ActionStopQueue(int delay, + public ActionStopQueue(long delay, TimeUnit timeUnit) { super("stop queue", delay, timeUnit); } + public ActionStopQueue(String name, + long delay, + TimeUnit timeUnit) { + super(name, delay, timeUnit); + } + @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { // no-op } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java index 24cad1c..f084383 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java @@ -20,15 +20,26 @@ package org.apache.slider.server.appmaster.actions; import org.apache.slider.core.main.LauncherExitCodes; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; public class ActionStopSlider extends AsyncAction { public ActionStopSlider(String message, - int delay) { + long delay) { super(message, delay, ActionAttributes.HALTS_CLUSTER); } + public ActionStopSlider(String name, + long delay, + TimeUnit timeUnit) { + super(name, delay, timeUnit, ActionAttributes.HALTS_CLUSTER); + } + @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { String message = name; SliderAppMaster.getLog().info("SliderAppMasterApi.stopCluster: {}", message); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java index fed28e4..996390d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java @@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; import java.io.IOException; import java.util.Collections; @@ -43,12 +44,12 @@ public abstract class AsyncAction implements Delayed { } protected AsyncAction(String name, - int delayMillis) { + long delayMillis) { this(name, delayMillis, TimeUnit.MILLISECONDS); } protected AsyncAction(String name, - int delay, + long delay, TimeUnit timeUnit) { this.name = name; this.setNanos(convertAndOffset(delay, timeUnit)); @@ -56,7 +57,7 @@ public abstract class AsyncAction implements Delayed { } protected AsyncAction(String name, - int delay, + long delay, TimeUnit timeUnit, EnumSet<ActionAttributes> attrs) { this.name = name; @@ -65,7 +66,7 @@ public abstract class AsyncAction implements Delayed { } protected AsyncAction(String name, - int delay, + long delay, TimeUnit timeUnit, ActionAttributes... attributes) { this(name, delay, timeUnit); @@ -73,12 +74,12 @@ public abstract class AsyncAction implements Delayed { } protected AsyncAction(String name, - int delayMillis, + long delayMillis, ActionAttributes... attributes) { this(name, delayMillis, TimeUnit.MILLISECONDS); } - protected long convertAndOffset(int delay, TimeUnit timeUnit) { + protected long convertAndOffset(long delay, TimeUnit timeUnit) { return now() + TimeUnit.NANOSECONDS.convert(delay, timeUnit); } @@ -134,10 +135,11 @@ public abstract class AsyncAction implements Delayed { * Actual application * @param appMaster * @param queueService + * @param appState * @throws IOException */ public abstract void execute(SliderAppMaster appMaster, - QueueAccess queueService) throws Exception; + QueueAccess queueService, AppState appState) throws Exception; public long getNanos() { return nanos; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java index 6a2cc6b..2aa67bb 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java @@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; /** * Report container loss to the AM @@ -35,13 +36,15 @@ public class ProviderReportedContainerLoss extends AsyncAction { } public ProviderReportedContainerLoss( - ContainerId containerId, int delayMillis) { + ContainerId containerId, long delayMillis) { super("lost container " + containerId, delayMillis); this.containerId = containerId; } @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { appMaster.providerLostContainer(containerId); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java index 4e06f7a..4577025 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java @@ -19,6 +19,7 @@ package org.apache.slider.server.appmaster.actions; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; public class ProviderStartupCompleted extends AsyncAction { @@ -26,12 +27,14 @@ public class ProviderStartupCompleted extends AsyncAction { super("ProviderStartupCompleted"); } - public ProviderStartupCompleted(int delayMillis) { + public ProviderStartupCompleted(long delayMillis) { super("ProviderStartupCompleted", delayMillis); } @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { appMaster.eventCallbackEvent(null); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java index 160333d..cffaf5e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java @@ -34,5 +34,34 @@ public interface QueueAccess { * after its action time has been reached * @param action action to queue */ - void putDelayed(AsyncAction action); + void schedule(AsyncAction action); + + /** + * Remove an action from the queues. + * @param action action to remove + * @return true if the action was removed + */ + boolean remove(AsyncAction action); + + /** + * Add a named renewing action + * @param name name + * @param renewingAction wrapped action + */ + void renewing(String name, + RenewingAction<? extends AsyncAction> renewingAction); + + /** + * Look up a renewing action + * @param name name of the action + * @return the action or null if none was found + */ + RenewingAction<? extends AsyncAction> lookupRenewingAction(String name); + + /** + * Remove a renewing action + * @param name action name name of the action + * @return true if the action was found and removed. + */ + boolean removeRenewingAction(String name); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java index bc2b260..149f784 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java @@ -62,7 +62,7 @@ public class QueueExecutor implements Runnable { do { take = actionQueues.actionQueue.take(); log.debug("Executing {}", take); - take.execute(appMaster, actionQueues); + take.execute(appMaster, actionQueues, appMaster.getAppState()); } while (!(take instanceof ActionStopQueue)); log.info("Queue Executor run() stopped"); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java index 9e4e8d0..a48dcdd 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java @@ -19,26 +19,67 @@ package org.apache.slider.server.appmaster.actions; -import org.apache.hadoop.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; +import org.apache.slider.server.services.workflow.ServiceThreadFactory; +import org.apache.slider.server.services.workflow.WorkflowExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; -public class QueueService extends AbstractService - implements Runnable, QueueAccess { +/** + * The Queue service provides immediate and scheduled queues, as well + * as an executor thread that moves queued actions from the scheduled + * queue to the immediate one. + * + * <p> + * This code to be revisited to see if all that was needed is the single scheduled + * queue, implicitly making actions immediate by giving them an execution + * time of "now". It would force having a sequence number to all actions, one + * which the queue would have to set from its (monotonic, thread-safe) counter + * on every submission, with a modified comparison operator. This would guarantee + * that earlier submissions were picked before later ones. + */ +public class QueueService extends WorkflowExecutorService<ExecutorService> +implements Runnable, QueueAccess { private static final Logger log = LoggerFactory.getLogger(QueueService.class); + public static final String NAME = "Action Queue"; - public final DelayQueue<AsyncAction> delayedActions = new DelayQueue<>(); - - public final BlockingDeque<AsyncAction> actionQueue = + /** + * Immediate actions. + * Package scoped for testing. + */ + @VisibleForTesting + final BlockingDeque<AsyncAction> actionQueue = new LinkedBlockingDeque<>(); + /** + * Actions to be scheduled in the future + * Package scoped for testing. + */ + @VisibleForTesting + final DelayQueue<AsyncAction> scheduledActions = new DelayQueue<>(); + + /** + * Map of renewing actions by name ... this is to allow them to + * be cancelled by name + */ + private final Map<String, RenewingAction<? extends AsyncAction>> renewingActions + = new ConcurrentHashMap<>(); + + /** + * Create a queue instance with a single thread executor + */ public QueueService() { - super("action queue"); + super(NAME, + ServiceThreadFactory.singleThreadExecutor(NAME, true)); } @Override @@ -48,9 +89,53 @@ public class QueueService extends AbstractService } @Override - public void putDelayed(AsyncAction action) { - log.debug("Delayed Queueing {}", action); - delayedActions.add(action); + public void schedule(AsyncAction action) { + log.debug("Scheduling {}", action); + scheduledActions.add(action); + } + + @Override + public boolean remove(AsyncAction action) { + boolean removedFromDelayQueue = scheduledActions.remove(action); + boolean removedFromActions = actionQueue.remove(action); + return removedFromActions || removedFromDelayQueue; + } + + @Override + public void renewing(String name, + RenewingAction<? extends AsyncAction> renewingAction) { + log.debug("Adding renewing Action \"{}\": {}", name, + renewingAction.getAction()); + if (removeRenewingAction(name)) { + log.debug("Removed predecessor action"); + } + renewingActions.put(name, renewingAction); + schedule(renewingAction); + } + + @Override + public RenewingAction<? extends AsyncAction> lookupRenewingAction(String name) { + return renewingActions.get(name); + } + + @Override + public boolean removeRenewingAction(String name) { + RenewingAction<? extends AsyncAction> action = renewingActions.remove(name); + return action != null && remove(action); + } + + /** + * Stop the service by scheduling an {@link ActionStopQueue} action + * ..if the processor thread is working this will propagate through + * and stop the queue handling after all other actions complete. + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + ActionStopQueue stopQueue = new ActionStopQueue("serviceStop: "+ this, + 0, TimeUnit.MILLISECONDS); + schedule(stopQueue); + super.serviceStop(); } /** @@ -64,7 +149,7 @@ public class QueueService extends AbstractService AsyncAction take; do { - take = delayedActions.take(); + take = scheduledActions.take(); log.debug("Propagating {}", take); actionQueue.put(take); } while (!(take instanceof ActionStopQueue)); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java index 30870bf..c62582f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java @@ -18,7 +18,9 @@ package org.apache.slider.server.appmaster.actions; +import com.google.common.base.Preconditions; import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,11 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger; * can specify the number of times to run */ -public class RenewingAction extends AsyncAction{ +public class RenewingAction<A extends AsyncAction> extends AsyncAction { private static final Logger log = LoggerFactory.getLogger(RenewingAction.class); - private final AsyncAction action; - private final int interval; + private final A action; + private final long interval; private final TimeUnit timeUnit; public final AtomicInteger executionCount = new AtomicInteger(); public final int limit; @@ -48,11 +50,15 @@ public class RenewingAction extends AsyncAction{ * @param timeUnit time unit for all times * @param limit limit on the no. of executions. If 0 or less: no limit */ - public RenewingAction(AsyncAction action, - int initialDelay, - int interval, TimeUnit timeUnit, + public RenewingAction(A action, + long initialDelay, + long interval, + TimeUnit timeUnit, int limit) { super("renewing " + action.name, initialDelay, timeUnit, action.getAttrs()); + // slightly superfluous as the super init above checks these values...retained + // in case that code is ever changed + Preconditions.checkArgument(action != null, "null actions"); this.action = action; this.interval = interval; this.timeUnit = timeUnit; @@ -63,14 +69,17 @@ public class RenewingAction extends AsyncAction{ * Execute the inner action then reschedule ourselves * @param appMaster * @param queueService + * @param appState * @throws Exception */ @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { long exCount = executionCount.incrementAndGet(); log.debug("{}: Executing inner action count # {}", this, exCount); - action.execute(appMaster, queueService); + action.execute(appMaster, queueService, appState); boolean reschedule = true; if (limit > 0) { reschedule = limit > exCount; @@ -79,15 +88,19 @@ public class RenewingAction extends AsyncAction{ this.setNanos(convertAndOffset(interval, timeUnit)); log.debug("{}: rescheduling, new offset {} mS ", this, getDelay(TimeUnit.MILLISECONDS)); - queueService.putDelayed(this); + queueService.schedule(this); } } - public AsyncAction getAction() { + /** + * Get the action + * @return + */ + public A getAction() { return action; } - public int getInterval() { + public long getInterval() { return interval; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java new file mode 100644 index 0000000..28bcf55 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +/** + * Requests the AM to reset the failure window + */ +public class ResetFailureWindow extends AsyncAction { + + public ResetFailureWindow() { + super("ResetFailureWindow"); + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appState.resetFailureCounts(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 667fe99..fd5b3a5 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -36,7 +36,6 @@ import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterDescriptionKeys; import org.apache.slider.api.ClusterDescriptionOperations; import org.apache.slider.api.ClusterNode; -import org.apache.slider.api.OptionKeys; import org.apache.slider.api.ResourceKeys; import org.apache.slider.api.RoleKeys; import org.apache.slider.api.StatusKeys; @@ -510,15 +509,16 @@ public class AppState { //set the livespan - MapOperations globalInternalOpts = - instanceDefinition.getInternalOperations().getGlobalOptions(); - startTimeThreshold = globalInternalOpts.getOptionInt( - OptionKeys.INTERNAL_CONTAINER_FAILURE_SHORTLIFE, - OptionKeys.DEFAULT_CONTAINER_FAILURE_SHORTLIFE); - - failureThreshold = globalInternalOpts.getOptionInt( - OptionKeys.INTERNAL_CONTAINER_FAILURE_THRESHOLD, - OptionKeys.DEFAULT_CONTAINER_FAILURE_THRESHOLD); + MapOperations globalResOpts = + instanceDefinition.getResourceOperations().getGlobalOptions(); + + startTimeThreshold = globalResOpts.getOptionInt( + ResourceKeys.CONTAINER_FAILURE_SHORTLIFE, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_SHORTLIFE); + + failureThreshold = globalResOpts.getOptionInt( + ResourceKeys.CONTAINER_FAILURE_THRESHOLD, + ResourceKeys.DEFAULT_CONTAINER_FAILURE_THRESHOLD); initClusterStatus(); @@ -531,8 +531,7 @@ public class AppState { // any am config options to pick up - logServerURL = appmasterConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, - ""); + logServerURL = appmasterConfig.get(YarnConfiguration.YARN_LOG_SERVER_URL, ""); //mark as live applicationLive = true; } @@ -630,7 +629,7 @@ public class AppState { } /** - * The resource configuration is updated -review and update state + * The resource configuration is updated -review and update state. * @param resources updated resources specification */ public synchronized void updateResourceDefinitions(ConfTree resources) throws @@ -1137,11 +1136,14 @@ public class AppState { RoleInstance instance = getStartingNodes().remove(containerId); if (null != instance) { RoleStatus roleStatus = lookupRoleStatus(instance.roleId); + String text; if (null != thrown) { - instance.diagnostics = SliderUtils.stringify(thrown); + text = SliderUtils.stringify(thrown); + } else { + text = "container start failure"; } - roleStatus.noteFailed(null); - roleStatus.incStartFailed(); + instance.diagnostics = text; + roleStatus.noteFailed(true, null); getFailedNodes().put(containerId, instance); roleHistory.onNodeManagerContainerStartFailed(instance.container); } @@ -1258,12 +1260,7 @@ public class AppState { } else { message = String.format("Failure %s", containerId); } - roleStatus.noteFailed(message); - //have a look to see if it short lived - if (shortLived) { - roleStatus.incStartFailed(); - } - + roleStatus.noteFailed(shortLived, message); if (failedContainer != null) { roleHistory.onFailedContainer(failedContainer, shortLived); } @@ -1446,9 +1443,15 @@ public class AppState { } return allOperations; } - - public void checkFailureThreshold(RoleStatus role) throws - TriggerClusterTeardownException { + + /** + * Check the failure threshold for a role + * @param role role to examine + * @throws TriggerClusterTeardownException if the role + * has failed too many times + */ + private void checkFailureThreshold(RoleStatus role) + throws TriggerClusterTeardownException { int failures = role.getFailed(); if (failures > failureThreshold) { @@ -1463,6 +1466,17 @@ public class AppState { role.getFailureMessage()); } } + + /** + * Reset the failure counts of all roles + */ + public void resetFailureCounts() { + for (RoleStatus roleStatus : getRoleStatusMap().values()) { + int failed = roleStatus.resetFailed(); + log.debug("Resetting failure count of {}; was {}", roleStatus.getName(), + failed); + } + } /** * Look at the allocation status of one role, and trigger add/release @@ -1474,7 +1488,7 @@ public class AppState { * the internal state of the application is inconsistent. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - public List<AbstractRMOperation> reviewOneRole(RoleStatus role) + private List<AbstractRMOperation> reviewOneRole(RoleStatus role) throws SliderInternalStateException, TriggerClusterTeardownException { List<AbstractRMOperation> operations = new ArrayList<>(); int delta; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index e9da081..2338103 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -478,7 +478,7 @@ public class RoleHistory { } } if (nodeInstance == null) { - log.debug("No node selected for {}", role.getName()); + log.debug("No historical node found for {}", role.getName()); } return nodeInstance; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index b3dddf6..898edce 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -45,7 +45,7 @@ public final class RoleStatus implements Cloneable { private final ProviderRole providerRole; private int desired, actual, requested, releasing; - private int failed, started, startFailed, completed, totalRequested; + private volatile int failed, started, startFailed, completed, totalRequested; private String failureMessage = ""; @@ -143,16 +143,32 @@ public final class RoleStatus implements Cloneable { } /** + * Reset the failure counts + * @return the total number of failures up to this point + */ + public int resetFailed() { + int total = failed + startFailed; + failed = 0; + startFailed = 0; + return total; + } + + /** * Note that a role failed, text will * be used in any diagnostics if an exception * is later raised. + * @param startupFailure * @param text text about the failure */ - public void noteFailed(String text) { + public void noteFailed(boolean startupFailure, String text) { failed++; if (text != null) { failureMessage = text; } + //have a look to see if it short lived + if (startupFailure) { + incStartFailed(); + } } public int getStartFailed() { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java index 85d5330..7409d32 100644 --- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java +++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java @@ -96,8 +96,8 @@ public class WorkflowExecutorService<E extends ExecutorService> extends Abstract */ @Override protected void serviceStop() throws Exception { - super.serviceStop(); stopExecutor(); + super.serviceStop(); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy index 1e22acd..7ee5af1 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy @@ -22,6 +22,7 @@ import groovy.util.logging.Slf4j import org.apache.hadoop.conf.Configuration import org.apache.hadoop.service.ServiceOperations import org.apache.slider.server.appmaster.SliderAppMaster +import org.apache.slider.server.appmaster.state.AppState import org.apache.slider.server.services.workflow.ServiceThreadFactory import org.apache.slider.server.services.workflow.WorkflowExecutorService import org.junit.After @@ -91,7 +92,7 @@ class TestActions { long start = System.currentTimeMillis() ActionStopQueue stopAction = new ActionStopQueue(1000); - queues.delayedActions.add(stopAction); + queues.scheduledActions.add(stopAction); queues.run(); AsyncAction take = queues.actionQueue.take(); assert take == stopAction @@ -135,9 +136,9 @@ class TestActions { assert note2.compareTo(stop) < 0 assert note1.nanos < note2.nanos assert note2.nanos < stop.nanos - queues.putDelayed(note1) - queues.putDelayed(note2) - queues.putDelayed(stop) + queues.schedule(note1) + queues.schedule(note2) + queues.schedule(stop) // async to sync expected to run in order runQueuesToCompletion() assert note1.executed.get() @@ -146,7 +147,7 @@ class TestActions { public void runQueuesToCompletion() { queues.run(); - assert queues.delayedActions.empty + assert queues.scheduledActions.empty assert !queues.actionQueue.empty QueueExecutor ex = new QueueExecutor(queues) ex.run(); @@ -162,18 +163,34 @@ class TestActions { 100, TimeUnit.MILLISECONDS, 3) - queues.putDelayed(renewer); + queues.schedule(renewer); def stop = new ActionStopQueue(4, TimeUnit.SECONDS) - queues.putDelayed(stop); + queues.schedule(stop); // this runs all the delayed actions FIRST, so can't be used // to play tricks of renewing actions ahead of the stop action runQueuesToCompletion() assert renewer.executionCount == 1 assert note1.executionCount == 1 // assert the renewed item is back in - assert queues.delayedActions.contains(renewer) + assert queues.scheduledActions.contains(renewer) } + + @Test + public void testRenewingActionOperations() throws Throwable { + ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500) + RenewingAction renewer = new RenewingAction( + note1, + 100, + 100, + TimeUnit.MILLISECONDS, + 3) + queues.renewing("note", renewer) + assert queues.removeRenewingAction("note") + queues.stop() + queues.waitForServiceToStop(10000) + } + public class ActionNoteExecuted extends AsyncAction { public final AtomicBoolean executed = new AtomicBoolean(false); public final AtomicLong executionTimeNanos = new AtomicLong() @@ -184,11 +201,16 @@ class TestActions { } @Override - public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { + public void execute( + SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { log.info("Executing $name"); executed.set(true); executionTimeNanos.set(System.nanoTime()) executionCount.incrementAndGet() + log.info(this.toString()) + synchronized (this) { this.notify(); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy index 3a287bd..c0f41f4 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy @@ -21,8 +21,12 @@ package org.apache.slider.server.appmaster.model.appstate import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.ContainerId +import org.apache.slider.api.ResourceKeys +import org.apache.slider.core.conf.AggregateConf +import org.apache.slider.core.conf.MapOperations import org.apache.slider.core.exceptions.SliderException import org.apache.slider.core.exceptions.TriggerClusterTeardownException +import org.apache.slider.server.appmaster.actions.ResetFailureWindow import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockRoles import org.apache.slider.server.appmaster.model.mock.MockYarnEngine @@ -52,6 +56,15 @@ class TestMockAppStateContainerFailure extends BaseMockAppStateTest return new MockYarnEngine(8000, 4) } + @Override + AggregateConf buildInstanceDefinition() { + def aggregateConf = super.buildInstanceDefinition() + def globalOptions = aggregateConf.resourceOperations.globalOptions + globalOptions.put(ResourceKeys.CONTAINER_FAILURE_THRESHOLD, "10") + + return aggregateConf + } + @Test public void testShortLivedFail() throws Throwable { @@ -153,7 +166,7 @@ class TestMockAppStateContainerFailure extends BaseMockAppStateTest ContainerId cid = ids[0] log.info("$i instance $instances[0] $cid") assert cid - appState.onNodeManagerContainerStartFailed(cid, new SliderException("oops")) + appState.onNodeManagerContainerStartFailed(cid, new SliderException("failure #${i}")) AppState.NodeCompletionResult result = appState.onCompletedNode(containerStatus(cid)) assert result.containerFailed } @@ -163,4 +176,33 @@ class TestMockAppStateContainerFailure extends BaseMockAppStateTest } } + + @Test + public void testFailureWindow() throws Throwable { + + ResetFailureWindow resetter = new ResetFailureWindow(); + + // initial reset + resetter.execute(null, null, appState) + + role0Status.desired = 1 + for (int i = 0; i < 100; i++) { + resetter.execute(null, null, appState) + List<RoleInstance> instances = createAndSubmitNodes() + assert instances.size() == 1 + + List<ContainerId> ids = extractContainerIds(instances, 0) + + ContainerId cid = ids[0] + log.info("$i instance $instances[0] $cid") + assert cid + appState.onNodeManagerContainerStartFailed( + cid, + new SliderException("failure #${i}")) + AppState.NodeCompletionResult result = appState.onCompletedNode( + containerStatus(cid)) + assert result.containerFailed + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index 504ca82..289b7c5 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.slider.common.tools.SliderFileSystem import org.apache.slider.common.tools.SliderUtils +import org.apache.slider.core.conf.AggregateConf import org.apache.slider.core.main.LauncherExitCodes import org.apache.slider.server.appmaster.operations.AbstractRMOperation import org.apache.slider.server.appmaster.state.* @@ -82,13 +83,22 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles appState = new AppState(new MockRecordFactory()) appState.setContainerLimits(RM_MAX_RAM, RM_MAX_CORES) appState.buildInstance( - factory.newInstanceDefinition(0, 0, 0), + buildInstanceDefinition(), new Configuration(), new Configuration(false), factory.ROLES, fs, historyPath, - null, null, new SimpleReleaseSelector()) + null, null, + new SimpleReleaseSelector()) + } + + /** + * Override point, define the instance definition + * @return + */ + public AggregateConf buildInstanceDefinition() { + factory.newInstanceDefinition(0, 0, 0) } abstract String getTestName(); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestFailureThreshold.groovy ---------------------------------------------------------------------- diff --git a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestFailureThreshold.groovy b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestFailureThreshold.groovy deleted file mode 100644 index 0917f01..0000000 --- a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestFailureThreshold.groovy +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.providers.hbase.minicluster.failures - -import groovy.transform.CompileStatic -import groovy.util.logging.Slf4j -import org.apache.hadoop.hbase.ClusterStatus -import org.apache.hadoop.yarn.api.records.ApplicationReport -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus -import org.apache.slider.core.main.ServiceLauncher -import org.apache.slider.common.SliderExitCodes -import org.apache.slider.api.ClusterDescription -import org.apache.slider.api.OptionKeys -import org.apache.slider.core.exceptions.BadClusterStateException -import org.apache.slider.core.exceptions.ErrorStrings -import org.apache.slider.common.params.Arguments -import org.apache.slider.client.SliderClient -import org.apache.slider.providers.hbase.minicluster.HBaseMiniClusterTestBase -import org.junit.Test - -/** - * test that if a container is killed too many times, - * the AM stays down - */ -@CompileStatic -@Slf4j - -class TestFailureThreshold extends HBaseMiniClusterTestBase { - - @Test - public void testFailedRegionService() throws Throwable { - failureThresholdTestRun("", true, 2, 5) - } - - - - private void failureThresholdTestRun( - String testName, - boolean toKill, - int threshold, - int killAttempts) { - String action = toKill ? "kill" : "stop" - int regionServerCount = 2 - String clustername = createMiniCluster(testName, configuration, 1, 1, 1, true, true) - describe( - "Create a single region service cluster then " + action + " the RS"); - - //now launch the cluster - ServiceLauncher<SliderClient> launcher = createHBaseCluster( - clustername, - regionServerCount, - [ - Arguments.ARG_OPTION, OptionKeys.INTERNAL_CONTAINER_FAILURE_THRESHOLD, - Integer.toString(threshold) - ], - true, - true) - SliderClient client = launcher.service - addToTeardown(client); - ClusterDescription status = client.getClusterDescription(clustername) - - ClusterStatus clustat = basicHBaseClusterStartupSequence(client) - ClusterStatus hbaseStat - try { - for (restarts in 1..killAttempts) { - status = waitForWorkerInstanceCount( - client, - regionServerCount, - hbaseClusterStartupToLiveTime) - //get the hbase status -/* - hbaseStat = waitForHBaseRegionServerCount( - client, - clustername, - regionServerCount, - HBASE_CLUSTER_STARTUP_TO_LIVE_TIME) - - log.info("Initial cluster status : ${hbaseStatusToString(hbaseStat)}"); -*/ - describe("running processes") - lsJavaProcesses() - describe("about to " + action + " servers") - if (toKill) { - killAllRegionServers() - } else { - stopAllRegionServers() - } - - //sleep a bit - sleep(toKill ? 15000 : 25000); - - describe("waiting for recovery") - - //and expect a recovery - if (restarts < threshold) { - - def restartTime = 1000 - status = waitForWorkerInstanceCount( - client, - regionServerCount, - restartTime) - hbaseStat = waitForHBaseRegionServerCount( - client, - clustername, - regionServerCount, - restartTime) - } else { - //expect the cluster to have failed - try { - def finalCD = client.getClusterDescription(clustername) - dumpClusterDescription("expected the AM to have failed", finalCD) - fail("AM had not failed after $restarts worker kills") - - } catch (BadClusterStateException e) { - assert e.toString().contains(ErrorStrings.E_APPLICATION_NOT_RUNNING) - assert e.exitCode == SliderExitCodes.EXIT_BAD_STATE - //success - break; - } - } - } - } catch (BadClusterStateException e) { - assert e.toString().contains(ErrorStrings.E_APPLICATION_NOT_RUNNING) - assert e.exitCode == SliderExitCodes.EXIT_BAD_STATE - } - ApplicationReport report = client.applicationReport - log.info(report.diagnostics) - assert report.finalApplicationStatus == FinalApplicationStatus.FAILED - assert report.diagnostics.contains(ErrorStrings.E_UNSTABLE_CLUSTER) - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/9a474833/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestRegionServerFailureThreshold.groovy ---------------------------------------------------------------------- diff --git a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestRegionServerFailureThreshold.groovy b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestRegionServerFailureThreshold.groovy new file mode 100644 index 0000000..32afb35 --- /dev/null +++ b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/failures/TestRegionServerFailureThreshold.groovy @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.hbase.minicluster.failures + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import org.apache.hadoop.hbase.ClusterStatus +import org.apache.hadoop.yarn.api.records.ApplicationReport +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.slider.api.ResourceKeys +import org.apache.slider.core.main.ServiceLauncher +import org.apache.slider.common.SliderExitCodes +import org.apache.slider.api.ClusterDescription +import org.apache.slider.core.exceptions.BadClusterStateException +import org.apache.slider.core.exceptions.ErrorStrings +import org.apache.slider.common.params.Arguments +import org.apache.slider.client.SliderClient +import org.apache.slider.providers.hbase.minicluster.HBaseMiniClusterTestBase +import org.junit.Test + +/** + * test that if a container is killed too many times, + * the AM stays down + */ +@CompileStatic +@Slf4j + +class TestRegionServerFailureThreshold extends HBaseMiniClusterTestBase { + + @Test + public void testFailedRegionService() throws Throwable { + failureThresholdTestRun("", true, 2, 5) + } + + private void failureThresholdTestRun( + String testName, + boolean toKill, + int threshold, + int killAttempts) { + String action = toKill ? "kill" : "stop" + int regionServerCount = 1 + String clustername = createMiniCluster(testName, configuration, 1, 1, 1, true, true) + describe( + "Create a single region service HBase instance then " + action + " the RS"); + + //now launch the cluster + ServiceLauncher<SliderClient> launcher = createHBaseCluster( + clustername, + regionServerCount, + [ + Arguments.ARG_RESOURCE_OPT, + ResourceKeys.CONTAINER_FAILURE_THRESHOLD, + Integer.toString(threshold) + ], + true, + true) + SliderClient client = launcher.service + addToTeardown(client); + ClusterDescription status = client.getClusterDescription(clustername) + + ClusterStatus clustat = basicHBaseClusterStartupSequence(client) + ClusterStatus hbaseStat + try { + for (restarts in 1..killAttempts) { + status = waitForWorkerInstanceCount( + client, + regionServerCount, + hbaseClusterStartupToLiveTime) + //get the hbase status +/* + hbaseStat = waitForHBaseRegionServerCount( + client, + clustername, + regionServerCount, + HBASE_CLUSTER_STARTUP_TO_LIVE_TIME) + + log.info("Initial cluster status : ${hbaseStatusToString(hbaseStat)}"); +*/ + describe("running processes") + lsJavaProcesses() + describe("about to " + action + " servers") + if (toKill) { + killAllRegionServers() + } else { + stopAllRegionServers() + } + + //sleep a bit + sleep(toKill ? 15000 : 25000); + + describe("waiting for recovery") + + //and expect a recovery + if (restarts < threshold) { + + def restartTime = 1000 + status = waitForWorkerInstanceCount( + client, + regionServerCount, + restartTime) + hbaseStat = waitForHBaseRegionServerCount( + client, + clustername, + regionServerCount, + restartTime) + } else { + //expect the cluster to have failed + try { + def finalCD = client.getClusterDescription(clustername) + dumpClusterDescription("expected the AM to have failed", finalCD) + fail("AM had not failed after $restarts worker kills") + + } catch (BadClusterStateException e) { + assert e.toString().contains(ErrorStrings.E_APPLICATION_NOT_RUNNING) + assert e.exitCode == SliderExitCodes.EXIT_BAD_STATE + //success + break; + } + } + } + } catch (BadClusterStateException e) { + assert e.toString().contains(ErrorStrings.E_APPLICATION_NOT_RUNNING) + assert e.exitCode == SliderExitCodes.EXIT_BAD_STATE + } + ApplicationReport report = client.applicationReport + log.info(report.diagnostics) + assert report.finalApplicationStatus == FinalApplicationStatus.FAILED + assert report.diagnostics.contains(ErrorStrings.E_UNSTABLE_CLUSTER) + + } + + +}