Repository: hadoop Updated Branches: refs/heads/HDFS-7240 ce23d9adf -> 1f74cb2f1
YARN-5015. Support sliding window retry capability for container restart. (Chandni Singh via wangda) Change-Id: I07addd3e4ba8d98456ee2ff1d5c540a38fe61dea Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a5b27b3c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a5b27b3c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a5b27b3c Branch: refs/heads/HDFS-7240 Commit: a5b27b3c678ad2f5cb8dbfa1b60ef5cd365f8bde Parents: 9714fc1 Author: Wangda Tan <wan...@apache.org> Authored: Tue Mar 13 17:55:17 2018 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Tue Mar 13 17:55:17 2018 -0700 ---------------------------------------------------------------------- .../yarn/api/records/ContainerRetryContext.java | 21 ++- .../src/main/proto/yarn_protos.proto | 1 + .../distributedshell/ApplicationMaster.java | 10 +- .../applications/distributedshell/Client.java | 7 + .../impl/pb/ContainerRetryContextPBImpl.java | 15 ++ .../container/ContainerImpl.java | 86 +++++----- .../container/SlidingWindowRetryPolicy.java | 165 +++++++++++++++++++ .../recovery/NMLeveldbStateStoreService.java | 24 +++ .../recovery/NMNullStateStoreService.java | 5 + .../recovery/NMStateStoreService.java | 21 +++ .../container/TestContainer.java | 37 +++++ .../container/TestSlidingWindowRetryPolicy.java | 77 +++++++++ .../recovery/NMMemoryStateStoreService.java | 9 + .../TestNMLeveldbStateStoreService.java | 16 ++ 14 files changed, 451 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java index ef8bd17..7fb0036 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java @@ -49,6 +49,13 @@ import java.util.Set; * </li> * <li><em>retryInterval</em> specifies delaying some time before relaunch * container, the unit is millisecond.</li> + * <li> + * <em>failuresValidityInterval</em>: default value is -1. + * When failuresValidityInterval in milliseconds is set to {@literal >} 0, + * the failure number will not take failures which happen out of the + * failuresValidityInterval into failure count. If failure count + * reaches to <em>maxRetries</em>, the container will be failed. + * </li> * </ul> */ @Public @@ -63,16 +70,25 @@ public abstract class ContainerRetryContext { @Unstable public static ContainerRetryContext newInstance( ContainerRetryPolicy retryPolicy, Set<Integer> errorCodes, - int maxRetries, int retryInterval) { + int maxRetries, int retryInterval, long failuresValidityInterval) { ContainerRetryContext containerRetryContext = Records.newRecord(ContainerRetryContext.class); containerRetryContext.setRetryPolicy(retryPolicy); containerRetryContext.setErrorCodes(errorCodes); containerRetryContext.setMaxRetries(maxRetries); containerRetryContext.setRetryInterval(retryInterval); + containerRetryContext.setFailuresValidityInterval(failuresValidityInterval); return containerRetryContext; } + @Private + @Unstable + public static ContainerRetryContext newInstance( + ContainerRetryPolicy retryPolicy, Set<Integer> errorCodes, + int maxRetries, int retryInterval) { + return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval, -1); + } + public abstract ContainerRetryPolicy getRetryPolicy(); public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); public abstract Set<Integer> getErrorCodes(); @@ -81,4 +97,7 @@ public abstract class ContainerRetryContext { public abstract void setMaxRetries(int maxRetries); public abstract int getRetryInterval(); public abstract void setRetryInterval(int retryInterval); + public abstract long getFailuresValidityInterval(); + public abstract void setFailuresValidityInterval( + long failuresValidityInterval); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 6ca800a..5e200dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -754,6 +754,7 @@ message ContainerRetryContextProto { repeated int32 error_codes = 2; optional int32 max_retries = 3 [default = 0]; optional int32 retry_interval = 4 [default = 0]; + optional int64 failures_validity_interval = 5 [default = -1]; } enum ContainerRetryPolicyProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b2e3f41..5c10775 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -308,6 +308,7 @@ public class ApplicationMaster { private Set<Integer> containerRetryErrorCodes = null; private int containerMaxRetries = 0; private int containrRetryInterval = 0; + private long containerFailuresValidityInterval = -1; // Timeline domain ID private String domainId = null; @@ -471,6 +472,9 @@ public class ApplicationMaster { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("container_failures_validity_interval", true, + "Failures which are out of the time window will not be added to" + + " the number of container retry attempts"); opts.addOption("placement_spec", true, "Placement specification"); opts.addOption("debug", false, "Dump out debug information"); @@ -661,7 +665,8 @@ public class ApplicationMaster { cliParser.getOptionValue("container_max_retries", "0")); containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); - + containerFailuresValidityInterval = Long.parseLong( + cliParser.getOptionValue("container_failures_validity_interval", "-1")); if (!YarnConfiguration.timelineServiceEnabled(conf)) { timelineClient = null; timelineV2Client = null; @@ -1385,7 +1390,8 @@ public class ApplicationMaster { ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance( containerRetryPolicy, containerRetryErrorCodes, - containerMaxRetries, containrRetryInterval); + containerMaxRetries, containrRetryInterval, + containerFailuresValidityInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), null, containerRetryContext); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 06f0fd2..d6a753a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -373,6 +373,9 @@ public class Client { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("container_failures_validity_interval", true, + "Failures which are out of the time window will not be added to" + + " the number of container retry attempts"); opts.addOption("docker_client_config", true, "The docker client configuration path. The scheme should be supplied" + " (i.e. file:// or hdfs://)." @@ -579,6 +582,10 @@ public class Client { containerRetryOptions.add("--container_retry_interval " + cliParser.getOptionValue("container_retry_interval")); } + if (cliParser.hasOption("container_failures_validity_interval")) { + containerRetryOptions.add("--container_failures_validity_interval " + + cliParser.getOptionValue("container_failures_validity_interval")); + } if (cliParser.hasOption("flow_name")) { flowName = cliParser.getOptionValue("flow_name"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java index a5ef70d..a01d783 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java @@ -165,6 +165,21 @@ public class ContainerRetryContextPBImpl extends ContainerRetryContext { builder.setRetryInterval(retryInterval); } + @Override + public long getFailuresValidityInterval() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFailuresValidityInterval()) { + return -1; + } + return p.getFailuresValidityInterval(); + } + + @Override + public void setFailuresValidityInterval(long failuresValidityInterval) { + maybeInitBuilder(); + builder.setFailuresValidityInterval(failuresValidityInterval); + } + private ContainerRetryPolicyProto convertToProtoFormat( ContainerRetryPolicy containerRetryPolicy) { return ProtoUtils.convertToProtoFormat(containerRetryPolicy); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 751beff..2115100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -167,9 +167,11 @@ public class ContainerImpl implements Container { private long containerLaunchStartTime; private ContainerMetrics containerMetrics; private static Clock clock = SystemClock.getInstance(); + private ContainerRetryContext containerRetryContext; - // remaining retries to relaunch container if needed - private int remainingRetryAttempts; + private SlidingWindowRetryPolicy.RetryContext windowRetryContext; + private SlidingWindowRetryPolicy retryPolicy; + private String workDir; private String logDir; private String host; @@ -246,7 +248,10 @@ public class ContainerImpl implements Container { // Configure the Retry Context this.containerRetryContext = configureRetryContext( conf, launchContext, this.containerId); - this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries(); + this.windowRetryContext = new SlidingWindowRetryPolicy + .RetryContext(containerRetryContext); + this.retryPolicy = new SlidingWindowRetryPolicy(clock); + stateMachine = stateMachineFactory.make(this, ContainerState.NEW, context.getContainerStateTransitionListener()); this.context = context; @@ -289,7 +294,9 @@ public class ContainerImpl implements Container { this.recoveredAsKilled = rcs.getKilled(); this.diagnostics.append(rcs.getDiagnostics()); this.version = rcs.getVersion(); - this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); + this.windowRetryContext.setRemainingRetries( + rcs.getRemainingRetryAttempts()); + this.windowRetryContext.setRestartTimes(rcs.getRestartTimes()); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); this.resourceMappings = rcs.getResourceMappings(); @@ -1591,27 +1598,15 @@ public class ContainerImpl implements Container { if (exitEvent.getDiagnosticInfo() != null) { if (container.containerRetryContext.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY) { - int n = container.containerRetryContext.getMaxRetries() - - container.remainingRetryAttempts; - container.addDiagnostics("Diagnostic message from attempt " - + n + " : ", "\n"); + container.addDiagnostics("Diagnostic message from attempt : \n"); } container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n"); } if (container.shouldRetry(container.exitCode)) { - if (container.remainingRetryAttempts > 0) { - container.remainingRetryAttempts--; - try { - container.stateStore.storeContainerRemainingRetryAttempts( - container.getContainerId(), container.remainingRetryAttempts); - } catch (IOException e) { - LOG.warn( - "Unable to update remainingRetryAttempts in state store for " - + container.getContainerId(), e); - } - } - doRelaunch(container, container.remainingRetryAttempts, + container.storeRetryContext(); + doRelaunch(container, + container.windowRetryContext.getRemainingRetries(), container.containerRetryContext.getRetryInterval()); return ContainerState.RELAUNCHING; } else if (container.canRollback()) { @@ -1671,29 +1666,14 @@ public class ContainerImpl implements Container { @Override public boolean shouldRetry(int errorCode) { - return shouldRetry(errorCode, containerRetryContext, - remainingRetryAttempts); - } - - public static boolean shouldRetry(int errorCode, - ContainerRetryContext retryContext, int remainingRetryAttempts) { if (errorCode == ExitCode.SUCCESS.getExitCode() || errorCode == ExitCode.FORCE_KILLED.getExitCode() || errorCode == ExitCode.TERMINATED.getExitCode()) { return false; } - - ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy(); - if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS - || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES - && retryContext.getErrorCodes() != null - && retryContext.getErrorCodes().contains(errorCode))) { - return remainingRetryAttempts > 0 - || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; - } - - return false; + return retryPolicy.shouldRetry(windowRetryContext, errorCode); } + /** * Transition to EXITED_WITH_FAILURE */ @@ -1729,9 +1709,9 @@ public class ContainerImpl implements Container { container.containerRetryContext = configureRetryContext(container.context.getConf(), container.launchContext, container.containerId); - // Reset the retry attempts since its a fresh start - container.remainingRetryAttempts = - container.containerRetryContext.getMaxRetries(); + container.windowRetryContext = new SlidingWindowRetryPolicy + .RetryContext(container.containerRetryContext); + container.retryPolicy = new SlidingWindowRetryPolicy(clock); container.resourceSet = container.reInitContext.mergedResourceSet(container.resourceSet); @@ -2209,4 +2189,30 @@ public class ContainerImpl implements Container { container.getContainerId().toString()); deletionService.delete(deletionTask); } + + private void storeRetryContext() { + if (windowRetryContext.getRestartTimes() != null) { + try { + stateStore.storeContainerRestartTimes(containerId, + windowRetryContext.getRestartTimes()); + } catch (IOException e) { + LOG.warn( + "Unable to update finishTimeForRetryAttempts in state store for " + + containerId, e); + } + } + try { + stateStore.storeContainerRemainingRetryAttempts(containerId, + windowRetryContext.getRemainingRetries()); + } catch (IOException e) { + LOG.warn( + "Unable to update remainingRetryAttempts in state store for " + + containerId, e); + } + } + + @VisibleForTesting + SlidingWindowRetryPolicy getRetryPolicy() { + return retryPolicy; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java new file mode 100644 index 0000000..0208879 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.util.Clock; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * <p>Sliding window retry policy for relaunching a + * <code>Container</code> in Yarn.</p> + */ +@InterfaceStability.Unstable +public class SlidingWindowRetryPolicy { + + private Clock clock; + + public SlidingWindowRetryPolicy(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } + + public boolean shouldRetry(RetryContext retryContext, + int errorCode) { + ContainerRetryContext containerRC = retryContext + .containerRetryContext; + Preconditions.checkNotNull(containerRC, "container retry context null"); + ContainerRetryPolicy retryPolicy = containerRC.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES + && containerRC.getErrorCodes() != null + && containerRC.getErrorCodes().contains(errorCode))) { + if (containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER) { + return true; + } + int pendingRetries = calculatePendingRetries(retryContext); + updateRetryContext(retryContext, pendingRetries); + return pendingRetries > 0; + } + return false; + } + + /** + * Calculates the pending number of retries. + * <p> + * When failuresValidityInterval is > 0, it also removes time entries from + * <code>restartTimes</code> which are outside the validity interval. + * + * @return the pending retries. + */ + private int calculatePendingRetries(RetryContext retryContext) { + ContainerRetryContext containerRC = + retryContext.containerRetryContext; + if (containerRC.getFailuresValidityInterval() > 0) { + Iterator<Long> iterator = retryContext.getRestartTimes().iterator(); + long currentTime = clock.getTime(); + while (iterator.hasNext()) { + long restartTime = iterator.next(); + if (currentTime - restartTime + > containerRC.getFailuresValidityInterval()) { + iterator.remove(); + } else { + break; + } + } + return containerRC.getMaxRetries() - + retryContext.getRestartTimes().size(); + } else { + return retryContext.getRemainingRetries(); + } + } + + /** + * Updates remaining retries and the restart time when + * required in the retryContext. + */ + private void updateRetryContext(RetryContext retryContext, + int pendingRetries) { + retryContext.setRemainingRetries(pendingRetries - 1); + if (retryContext.containerRetryContext.getFailuresValidityInterval() + > 0) { + retryContext.getRestartTimes().add(clock.getTime()); + } + } + + /** + * Sets the clock. + * @param clock clock + */ + public void setClock(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } + + /** + * Sliding window container retry context. + * <p> + * Besides {@link ContainerRetryContext}, it also provide details such as: + * <ul> + * <li> + * <em>remainingRetries</em>: specifies the number of pending retries. It is + * initially set to <code>containerRetryContext.maxRetries</code>. + * </li> + * <li> + * <em>restartTimes</em>: when + * <code>containerRetryContext.failuresValidityInterval</code> is set, + * then this records the times when the container is set to restart. + * </li> + * </ul> + */ + static class RetryContext { + + private final ContainerRetryContext containerRetryContext; + private List<Long> restartTimes = new ArrayList<>(); + private int remainingRetries; + + RetryContext(ContainerRetryContext containerRetryContext) { + this.containerRetryContext = Preconditions + .checkNotNull(containerRetryContext); + this.remainingRetries = containerRetryContext.getMaxRetries(); + } + + ContainerRetryContext getContainerRetryContext() { + return containerRetryContext; + } + + int getRemainingRetries() { + return remainingRetries; + } + + void setRemainingRetries(int remainingRetries) { + this.remainingRetries = remainingRetries; + } + + List<Long> getRestartTimes() { + return restartTimes; + } + + void setRestartTimes(List<Long> restartTimes) { + if (restartTimes != null) { + this.restartTimes.clear(); + this.restartTimes.addAll(restartTimes); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 0f659d9..bf4c0ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -127,6 +127,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = "/remainingRetryAttempts"; + private static final String CONTAINER_RESTART_TIMES_SUFFIX = + "/restartTimes"; private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir"; private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir"; @@ -338,6 +340,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { rcs.setRemainingRetryAttempts( Integer.parseInt(asString(entry.getValue()))); + } else if (suffix.equals(CONTAINER_RESTART_TIMES_SUFFIX)) { + String value = asString(entry.getValue()); + // parse the string format of List<Long>, e.g. [34, 21, 22] + String[] unparsedRestartTimes = + value.substring(1, value.length() - 1).split(", "); + List<Long> restartTimes = new ArrayList<>(); + for (String restartTime : unparsedRestartTimes) { + restartTimes.add(Long.parseLong(restartTime)); + } + rcs.setRestartTimes(restartTimes); } else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) { rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { @@ -582,6 +594,18 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } @Override + public void storeContainerRestartTimes(ContainerId containerId, + List<Long> restartTimes) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_RESTART_TIMES_SUFFIX; + try { + db.put(bytes(key), bytes(restartTimes.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { String key = CONTAINERS_KEY_PREFIX + containerId.toString() http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 78137bb..f217f2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -120,6 +120,11 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override + public void storeContainerRestartTimes(ContainerId containerId, + List<Long> restartTimes) throws IOException { + } + + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index f9b86bf..0ea0ef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -98,6 +98,7 @@ public abstract class NMStateStoreService extends AbstractService { StartContainerRequest startRequest; Resource capability; private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; + private List<Long> restartTimes; private String workDir; private String logDir; int version; @@ -150,6 +151,15 @@ public abstract class NMStateStoreService extends AbstractService { this.remainingRetryAttempts = retryAttempts; } + public List<Long> getRestartTimes() { + return restartTimes; + } + + public void setRestartTimes( + List<Long> restartTimes) { + this.restartTimes = restartTimes; + } + public String getWorkDir() { return workDir; } @@ -177,6 +187,7 @@ public abstract class NMStateStoreService extends AbstractService { .append(", Capability: ").append(getCapability()) .append(", StartRequest: ").append(getStartRequest()) .append(", RemainingRetryAttempts: ").append(remainingRetryAttempts) + .append(", RestartTimes: ").append(restartTimes) .append(", WorkDir: ").append(workDir) .append(", LogDir: ").append(logDir) .toString(); @@ -487,6 +498,16 @@ public abstract class NMStateStoreService extends AbstractService { ContainerId containerId, int remainingRetryAttempts) throws IOException; /** + * Record restart times for a container. + * @param containerId + * @param restartTimes + * @throws IOException + */ + public abstract void storeContainerRestartTimes( + ContainerId containerId, List<Long> restartTimes) + throws IOException; + + /** * Record working directory for a container. * @param containerId the container ID * @param workDir the working directory http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index c32ff1a..1a263ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -1109,6 +1110,38 @@ public class TestContainer { } } + @Test + public void testContainerRetryFailureValidityInterval() throws Exception { + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10); + WrappedContainer wc = null; + try { + wc = new WrappedContainer(25, 314159265358980L, 4200, "test", + containerRetryContext); + ControlledClock clock = new ControlledClock(); + wc.getRetryPolicy().setClock(clock); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(20); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(40); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(45); + wc.containerFailed(12); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -1574,5 +1607,9 @@ public class TestContainer { public String getDiagnostics() { return c.cloneAndGetContainerStatus().getDiagnostics(); } + + public SlidingWindowRetryPolicy getRetryPolicy() { + return ((ContainerImpl)c).getRetryPolicy(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java new file mode 100644 index 0000000..04889a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link SlidingWindowRetryPolicy}. + */ +public class TestSlidingWindowRetryPolicy { + + private ControlledClock clock; + private SlidingWindowRetryPolicy retryPolicy; + + @Before + public void setup() { + clock = new ControlledClock(); + retryPolicy = new SlidingWindowRetryPolicy(clock); + } + + @Test + public void testNeverRetry() { + ContainerRetryContext retryContext = + ContainerRetryContext.NEVER_RETRY_CONTEXT; + Assert.assertFalse("never retry", retryPolicy.shouldRetry( + new SlidingWindowRetryPolicy.RetryContext(retryContext), 12)); + } + + @Test + public void testAlwaysRetry() { + ContainerRetryContext retryContext = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, -1, + 0, 10); + Assert.assertTrue("always retry", retryPolicy.shouldRetry( + new SlidingWindowRetryPolicy.RetryContext(retryContext), 12)); + } + + @Test + public void testFailuresValidityInterval() { + ContainerRetryContext retryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10); + SlidingWindowRetryPolicy.RetryContext windowRetryContext = + new SlidingWindowRetryPolicy.RetryContext(retryContext); + Assert.assertTrue("retry 1", + retryPolicy.shouldRetry(windowRetryContext, 12)); + clock.setTime(20); + Assert.assertTrue("retry 2", + retryPolicy.shouldRetry(windowRetryContext, 12)); + clock.setTime(40); + Assert.assertTrue("retry 3", + retryPolicy.shouldRetry(windowRetryContext, 12)); + clock.setTime(45); + Assert.assertFalse("retry failed", + retryPolicy.shouldRetry(windowRetryContext, 12)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 3dca367..b67d11f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -121,6 +121,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); + rcsCopy.setRestartTimes(rcs.getRestartTimes()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); rcsCopy.setResourceMappings(rcs.getResourceMappings()); @@ -213,6 +214,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override + public void storeContainerRestartTimes( + ContainerId containerId, List<Long> restartTimes) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setRestartTimes(restartTimes); + } + + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b27b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index de667d1..c270199 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -371,6 +371,7 @@ public class TestNMLeveldbStateStoreService { assertEquals("/test/workdir", rcs.getWorkDir()); assertEquals("/test/logdir", rcs.getLogDir()); + validateRetryAttempts(containerId); // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); @@ -378,6 +379,21 @@ public class TestNMLeveldbStateStoreService { assertTrue(recoveredContainers.isEmpty()); } + private void validateRetryAttempts(ContainerId containerId) + throws IOException { + // store finishTimeForRetryAttempts + List<Long> finishTimeForRetryAttempts = Arrays.asList(1462700529039L, + 1462700529050L, 1462700529120L); + stateStore.storeContainerRestartTimes(containerId, + finishTimeForRetryAttempts); + restartStateStore(); + RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + List<Long> recoveredRestartTimes = rcs.getRestartTimes(); + assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); + assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); + assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2)); + } + private StartContainerRequest createContainerRequest( ContainerId containerId) { LocalResource lrsrc = LocalResource.newInstance( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org