YARN-7275. NM Statestore cleanup for Container updates. (Kartheek Muthyala via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a50be1b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a50be1b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a50be1b8 Branch: refs/heads/YARN-1011 Commit: a50be1b8f432f50c940b66d12c7de87b95ea47c0 Parents: 8dbc890 Author: Arun Suresh <asur...@apache.org> Authored: Mon Oct 16 13:08:52 2017 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Mon Oct 16 13:12:15 2017 -0700 ---------------------------------------------------------------------- .../containermanager/ContainerManagerImpl.java | 9 +++ .../container/ContainerEventType.java | 4 +- .../container/ContainerImpl.java | 43 +++++++++----- .../launcher/ContainersLauncher.java | 2 - .../launcher/RecoverPausedContainerLaunch.java | 38 +++++++----- .../launcher/RecoveredContainerLaunch.java | 2 +- .../scheduler/ContainerScheduler.java | 31 ++++++++++ .../scheduler/ContainerSchedulerEventType.java | 3 +- .../recovery/NMLeveldbStateStoreService.java | 62 ++++++++++++++------ .../recovery/NMNullStateStoreService.java | 6 +- .../recovery/NMStateStoreService.java | 10 ++-- .../TestContainerManagerRecovery.java | 4 ++ .../recovery/NMMemoryStateStoreService.java | 18 ++++-- .../TestNMLeveldbStateStoreService.java | 16 +++-- 14 files changed, 179 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 38eb636..7d5525a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -367,6 +368,13 @@ public class ContainerManagerImpl extends CompositeService implements } recoverContainer(rcs); } + + //Dispatching the RECOVERY_COMPLETED event through the dispatcher + //so that all the paused, scheduled and queued containers will + //be scheduled for execution on availability of resources. + dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.RECOVERY_COMPLETED)); } else { LOG.info("Not a recoverable state store. Nothing to recover."); } @@ -480,6 +488,7 @@ public class ContainerManagerImpl extends CompositeService implements Container container = new ContainerImpl(getConfig(), dispatcher, launchContext, credentials, metrics, token, context, rcs); context.getContainers().put(token.getContainerID(), container); + containerScheduler.recoverActiveContainer(container, rcs.getStatus()); app.handle(new ApplicationContainerInitEvent(container)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index e28b37d..75e32e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -46,5 +46,7 @@ public enum ContainerEventType { CONTAINER_RESUMED, // Producer: ContainerScheduler - CONTAINER_TOKEN_UPDATED + CONTAINER_TOKEN_UPDATED, + + RECOVER_PAUSED_CONTAINER } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index df83789..17b24b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -362,6 +362,9 @@ public class ContainerImpl implements Container { // From SCHEDULED State .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING, ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) + .addTransition(ContainerState.SCHEDULED, ContainerState.PAUSED, + ContainerEventType.RECOVER_PAUSED_CONTAINER, + new RecoveredContainerTransition()) .addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition(true)) @@ -952,7 +955,10 @@ public class ContainerImpl implements Container { if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { // try to recover a container that was previously launched launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } else if (recoveredStatus == RecoveredContainerStatus.PAUSED) { + launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER; } + containerLaunchStartTime = clock.getTime(); dispatcher.getEventHandler().handle( new ContainersLauncherEvent(this, launcherEvent)); @@ -963,9 +969,6 @@ public class ContainerImpl implements Container { @SuppressWarnings("unchecked") // dispatcher not typed private void sendScheduleEvent() { if (recoveredStatus == RecoveredContainerStatus.PAUSED) { - // Recovery is not supported for paused container so we raise the - // launch event which will proceed to kill the paused container instead - // of raising the schedule event. ContainersLauncherEventType launcherEvent; launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER; dispatcher.getEventHandler() @@ -1060,17 +1063,15 @@ public class ContainerImpl implements Container { UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event; // Update the container token container.setContainerTokenIdentifier(updateEvent.getUpdatedToken()); - if (updateEvent.isResourceChange()) { - try { - // Persist change in the state store. - container.context.getNMStateStore().storeContainerResourceChanged( - container.containerId, - container.getContainerTokenIdentifier().getVersion(), - container.getResource()); - } catch (IOException e) { - LOG.warn("Could not store container [" + container.containerId - + "] resource change..", e); - } + + try { + // Persist change in the state store. + container.context.getNMStateStore() + .storeContainerUpdateToken(container.containerId, + container.getContainerTokenIdentifier()); + } catch (IOException e) { + LOG.warn("Could not store container [" + container.containerId + + "] update..", e); } } } @@ -1115,6 +1116,8 @@ public class ContainerImpl implements Container { if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { container.sendFinishedEvents(); return ContainerState.DONE; + } else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) { + return ContainerState.SCHEDULED; } else if (container.recoveredAsKilled && container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { // container was killed but never launched @@ -1474,6 +1477,18 @@ public class ContainerImpl implements Container { } /** + * Transition from SCHEDULED state to PAUSED state on recovery + */ + static class RecoveredContainerTransition extends ContainerTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + container.sendContainerMonitorStartEvent(); + container.wasLaunched = true; + } + } + + /** * Transition from RUNNING or KILLING state to * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 9f6ef74..cfd5d6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -140,8 +140,6 @@ public class ContainersLauncher extends AbstractService running.put(containerId, launch); break; case RECOVER_PAUSED_CONTAINER: - // Recovery for paused containers is not supported, thus here - // we locate any paused containers, and terminate them. app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); launch = new RecoverPausedContainerLaunch(context, getConfig(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java index 14cab9a..761fe3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*; -import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; import java.io.File; import java.io.IOException; @@ -66,6 +66,8 @@ public class RecoverPausedContainerLaunch extends ContainerLaunch { containerId.getApplicationAttemptId().getApplicationId().toString(); String containerIdStr = containerId.toString(); + dispatcher.getEventHandler().handle(new ContainerEvent(containerId, + ContainerEventType.RECOVER_PAUSED_CONTAINER)); boolean notInterrupted = true; try { File pidFile = locatePidFile(appIdStr, containerIdStr); @@ -73,16 +75,17 @@ public class RecoverPausedContainerLaunch extends ContainerLaunch { String pidPathStr = pidFile.getPath(); pidFilePath = new Path(pidPathStr); exec.activateContainer(containerId, pidFilePath); - exec.signalContainer(new ContainerSignalContext.Builder() - .setContainer(container) - .setUser(container.getUser()) - .setSignal(ContainerExecutor.Signal.KILL) - .build()); + retCode = exec.reacquireContainer( + new ContainerReacquisitionContext.Builder() + .setContainer(container) + .setUser(container.getUser()) + .setContainerId(containerId) + .build()); } else { LOG.warn("Unable to locate pid file for container " + containerIdStr); } - } catch (InterruptedIOException e) { + } catch (InterruptedException | InterruptedIOException e) { LOG.warn("Interrupted while waiting for exit code from " + containerId); notInterrupted = false; } catch (IOException e) { @@ -100,14 +103,21 @@ public class RecoverPausedContainerLaunch extends ContainerLaunch { } } - LOG.warn("Recovered container exited with a non-zero exit code " - + retCode); - this.dispatcher.getEventHandler().handle(new ContainerExitEvent( - containerId, - ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode, - "Container exited with a non-zero exit code " + retCode)); + if (retCode != 0) { + LOG.warn("Recovered container exited with a non-zero exit code " + + retCode); + this.dispatcher.getEventHandler().handle(new ContainerExitEvent( + containerId, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode, + "Container exited with a non-zero exit code " + retCode)); + return retCode; + } - return retCode; + LOG.info("Recovered container " + containerId + " succeeded"); + dispatcher.getEventHandler().handle( + new ContainerEvent(containerId, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); + return 0; } private File locatePidFile(String appIdStr, String containerIdStr) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java index 17ddd77..a3ccf00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java @@ -72,7 +72,7 @@ public class RecoveredContainerLaunch extends ContainerLaunch { String containerIdStr = containerId.toString(); dispatcher.getEventHandler().handle(new ContainerEvent(containerId, - ContainerEventType.CONTAINER_LAUNCHED)); + ContainerEventType.RECOVER_PAUSED_CONTAINER)); boolean notInterrupted = true; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index e436822..76da37c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +169,8 @@ public class ContainerScheduler extends AbstractService implements case SHED_QUEUED_CONTAINERS: shedQueuedOpportunisticContainers(); break; + case RECOVERY_COMPLETED: + startPendingContainers(maxOppQueueLength <= 0); default: LOG.error("Unknown event arrived at ContainerScheduler: " + event.toString()); @@ -219,6 +222,34 @@ public class ContainerScheduler extends AbstractService implements } /** + * Populates auxiliary data structures used by the ContainerScheduler on + * recovery. + * @param container container recovered + * @param rcs Recovered Container status + */ + public void recoverActiveContainer(Container container, + RecoveredContainerStatus rcs) { + ExecutionType execType = + container.getContainerTokenIdentifier().getExecutionType(); + if (rcs == RecoveredContainerStatus.QUEUED + || rcs == RecoveredContainerStatus.PAUSED) { + if (execType == ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + } else if (execType == ExecutionType.OPPORTUNISTIC) { + queuedOpportunisticContainers + .put(container.getContainerId(), container); + } else { + LOG.error( + "UnKnown execution type received " + container.getContainerId() + + ", execType " + execType); + } + } else if (rcs == RecoveredContainerStatus.LAUNCHED) { + runningContainers.put(container.getContainerId(), container); + utilizationTracker.addContainerResources(container); + } + } + + /** * Return number of queued containers. * @return Number of queued containers. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index a9cbf74..294eddf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -27,5 +27,6 @@ public enum ContainerSchedulerEventType { UPDATE_CONTAINER, // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, - CONTAINER_PAUSED + CONTAINER_PAUSED, + RECOVERY_COMPLETED } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 2f9c0a7..c361d00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.slf4j.LoggerFactory; import java.io.File; @@ -47,11 +50,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -120,8 +121,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused"; - private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = - "/resourceChanged"; + private static final String CONTAINER_UPDATE_TOKEN_SUFFIX = + "/updateToken"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = @@ -290,9 +291,17 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { rcs.status = RecoveredContainerStatus.COMPLETED; rcs.exitCode = Integer.parseInt(asString(entry.getValue())); - } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { - rcs.capability = new ResourcePBImpl( - ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_UPDATE_TOKEN_SUFFIX)) { + ContainerTokenIdentifierProto tokenIdentifierProto = + ContainerTokenIdentifierProto.parseFrom(entry.getValue()); + Token currentToken = rcs.getStartRequest().getContainerToken(); + Token updatedToken = Token + .newInstance(tokenIdentifierProto.toByteArray(), + ContainerTokenIdentifier.KIND.toString(), + currentToken.getPassword().array(), currentToken.getService()); + rcs.startRequest.setContainerToken(updatedToken); + rcs.capability = new ResourcePBImpl(tokenIdentifierProto.getResource()); + rcs.version = tokenIdentifierProto.getVersion(); } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { rcs.setRemainingRetryAttempts( Integer.parseInt(asString(entry.getValue()))); @@ -374,6 +383,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + private void removeContainerQueued(ContainerId containerId) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("removeContainerQueued: containerId=" + containerId); + } + + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_QUEUED_KEY_SUFFIX; + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override public void storeContainerPaused(ContainerId containerId) throws IOException { if (LOG.isDebugEnabled()) { @@ -429,6 +453,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { LOG.debug("storeContainerLaunched: containerId=" + containerId); } + // Removing the container if queued for backward compatibility reasons + removeContainerQueued(containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_LAUNCHED_KEY_SUFFIX; try { @@ -439,24 +465,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } @Override - public void storeContainerResourceChanged(ContainerId containerId, - int containerVersion, Resource capability) throws IOException { + public void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("storeContainerResourceChanged: containerId=" + containerId - + ", capability=" + capability); + LOG.debug("storeContainerUpdateToken: containerId=" + containerId); } - String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() - + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX; + String keyUpdateToken = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_UPDATE_TOKEN_SUFFIX; String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_VERSION_KEY_SUFFIX; + try { WriteBatch batch = db.createWriteBatch(); try { // New value will overwrite old values for the same key - batch.put(bytes(keyResChng), - ProtoUtils.convertToProtoFormat(capability).toByteArray()); - batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion))); + batch.put(bytes(keyUpdateToken), + containerTokenIdentifier.getProto().toByteArray()); + batch.put(bytes(keyVersion), + bytes(Integer.toString(containerTokenIdentifier.getVersion()))); db.write(batch); } finally { batch.close(); @@ -552,6 +579,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_UPDATE_TOKEN_SUFFIX)); List<String> unknownKeysForContainer = containerUnknownKeySuffixes .removeAll(containerId); for (String unknownKeySuffix : unknownKeysForContainer) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index d1d0696..ca6d018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -28,12 +28,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; // The state store to use when state isn't being stored @@ -99,8 +99,8 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override - public void storeContainerResourceChanged(ContainerId containerId, - int version, Resource capability) throws IOException { + public void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 999d2d9..5e2b8a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -429,14 +430,13 @@ public abstract class NMStateStoreService extends AbstractService { throws IOException; /** - * Record that a container resource has been changed + * Record that a container has been updated * @param containerId the container ID - * @param containerVersion the container version - * @param capability the container resource capability + * @param containerTokenIdentifier container token identifier * @throws IOException */ - public abstract void storeContainerResourceChanged(ContainerId containerId, - int containerVersion, Resource capability) throws IOException; + public abstract void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException; /** * Record that a container has completed http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 5ec0ae6..0e629d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -405,6 +405,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { stateStore.start(); context = createContext(conf, stateStore); ContainerManagerImpl cm = createContainerManager(context, delSrvc); + ((NMContext) context).setContainerManager(cm); cm.init(conf); cm.start(); // add an application by starting a container @@ -430,6 +431,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { cm.stop(); context = createContext(conf, stateStore); cm = createContainerManager(context); + ((NMContext) context).setContainerManager(cm); cm.init(conf); cm.start(); assertEquals(1, context.getApplications().size()); @@ -448,6 +450,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { stateStore.start(); context = createContext(conf, stateStore); ContainerManagerImpl cm = createContainerManager(context, delSrvc); + ((NMContext) context).setContainerManager(cm); cm.init(conf); cm.start(); @@ -473,6 +476,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { cm.stop(); context = createContext(conf, stateStore); cm = createContainerManager(context); + ((NMContext) context).setContainerManager(cm); cm.init(conf); cm.start(); assertEquals(1, context.getApplications().size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 59a225a..eb222cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -33,12 +33,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -175,12 +176,17 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public synchronized void storeContainerResourceChanged( - ContainerId containerId, int version, Resource capability) - throws IOException { + public void storeContainerUpdateToken(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); - rcs.capability = capability; - rcs.version = version; + rcs.capability = containerTokenIdentifier.getResource(); + rcs.version = containerTokenIdentifier.getVersion(); + Token currentToken = rcs.getStartRequest().getContainerToken(); + Token updatedToken = Token + .newInstance(containerTokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), + currentToken.getPassword().array(), currentToken.getService()); + rcs.startRequest.setContainerToken(updatedToken); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/a50be1b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 8c13356..1ff2119 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -307,13 +307,18 @@ public class TestNMLeveldbStateStoreService { assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered - stateStore.storeContainerResourceChanged(containerId, 2, - Resource.newInstance(2468, 4)); + ContainerTokenIdentifier updateTokenIdentifier = + new ContainerTokenIdentifier(containerId, "host", "user", + Resource.newInstance(2468, 4), 9876543210L, 42, 2468, + Priority.newInstance(7), 13579); + + stateStore + .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); - assertEquals(2, rcs.getVersion()); + assertEquals(0, rcs.getVersion()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -330,7 +335,9 @@ public class TestNMLeveldbStateStoreService { assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertTrue(rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); + ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils + .newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken()); + assertEquals(updateTokenIdentifier, tokenReadFromRequest); assertEquals(diags.toString(), rcs.getDiagnostics()); // add yet more diags, mark container completed, and verify recovered @@ -344,7 +351,6 @@ public class TestNMLeveldbStateStoreService { assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); assertEquals(21, rcs.getExitCode()); assertTrue(rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); // store remainingRetryAttempts, workDir and logDir --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org