Repository: hadoop Updated Branches: refs/heads/branch-2 a47820d72 -> 0101973db
YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0101973d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0101973d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0101973d Branch: refs/heads/branch-2 Commit: 0101973dbf0ef9ba0f3e58be66aebf01b1586c92 Parents: a47820d Author: Arun Suresh <asur...@apache.org> Authored: Wed May 11 19:10:17 2016 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Fri Jul 14 13:57:22 2017 -0700 ---------------------------------------------------------------------- .../containermanager/ContainerManagerImpl.java | 18 +++++++++++- .../scheduler/ContainerScheduler.java | 19 +++++++------ .../recovery/NMLeveldbStateStoreService.java | 29 ++++++++++++++++++-- .../recovery/NMNullStateStoreService.java | 4 +++ .../recovery/NMStateStoreService.java | 9 ++++++ .../recovery/NMMemoryStateStoreService.java | 6 ++++ .../TestNMLeveldbStateStoreService.java | 12 ++++++++ 7 files changed, 85 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 095ce6d..14f30f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -376,7 +376,6 @@ public class ContainerManagerImpl extends CompositeService implements app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } - @SuppressWarnings("unchecked") private void recoverContainer(RecoveredContainerState rcs) throws IOException { StartContainerRequest req = rcs.getStartRequest(); @@ -405,6 +404,7 @@ public class ContainerManagerImpl extends CompositeService implements "Due to invalid StateStore info container was killed" + " during recovery")); } + recoverActiveContainer(launchContext, token, rcs); } else { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { LOG.warn(containerId + " has no corresponding application!"); @@ -414,6 +414,22 @@ public class ContainerManagerImpl extends CompositeService implements } } + /** + * Recover a running container. + */ + @SuppressWarnings("unchecked") + protected void recoverActiveContainer( + ContainerLaunchContext launchContext, ContainerTokenIdentifier token, + RecoveredContainerState rcs) throws IOException { + Credentials credentials = YarnServerSecurityUtils.parseCredentials( + launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + launchContext, credentials, metrics, token, context, rcs); + context.getContainers().put(token.getContainerID(), container); + dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent( + container)); + } + private void waitForRecoveredContainers() throws InterruptedException { final int sleepMsec = 100; int waitIterations = 100; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 5c96d55..11a8f3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -263,15 +264,15 @@ public class ContainerScheduler extends AbstractService implements "Opportunistic container queue is full."); } } -// if (isQueued) { -// try { -// this.context.getNMStateStore().storeContainerQueued( -// container.getContainerId()); -// } catch (IOException e) { -// LOG.warn("Could not store container [" + container.getContainerId() -// + "] state. The Container has been queued.", e); -// } -// } + if (isQueued) { + try { + this.context.getNMStateStore().storeContainerQueued( + container.getContainerId()); + } catch (IOException e) { + LOG.warn("Could not store container [" + container.getContainerId() + + "] state. The Container has been queued.", e); + } + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 29b82a8..f1e47c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -85,7 +85,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String DB_NAME = "yarn-nm-state"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; - private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0); + // Set to 1.1 by YARN-5049 + // Set to 1.2 by YARN-6127 + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 2); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -112,6 +115,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; @@ -256,8 +260,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { rcs.version = Integer.parseInt(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { rcs.diagnostics = asString(entry.getValue()); - } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) { if (rcs.status == RecoveredContainerStatus.REQUESTED) { + rcs.status = RecoveredContainerStatus.QUEUED; + } + } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + if ((rcs.status == RecoveredContainerStatus.REQUESTED) + || (rcs.status == RecoveredContainerStatus.QUEUED)) { rcs.status = RecoveredContainerStatus.LAUNCHED; } } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { @@ -322,6 +331,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } @Override + public void storeContainerQueued(ContainerId containerId) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("storeContainerQueued: containerId=" + containerId); + } + + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_QUEUED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { if (LOG.isDebugEnabled()) { @@ -464,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); List<String> unknownKeysForContainer = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 545cb74..96c3f9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -75,6 +75,10 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override + public void storeContainerQueued(ContainerId containerId) throws IOException { + } + + @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 02bf186..9f87279 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -69,6 +69,7 @@ public abstract class NMStateStoreService extends AbstractService { public enum RecoveredContainerStatus { REQUESTED, + QUEUED, LAUNCHED, COMPLETED } @@ -372,6 +373,14 @@ public abstract class NMStateStoreService extends AbstractService { throws IOException; /** + * Record that a container has been queued at the NM + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerQueued(ContainerId containerId) + throws IOException; + + /** * Record that a container has been launched * @param containerId the container ID * @throws IOException http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 5a48e2f..0e03994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -134,6 +134,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override + public void storeContainerQueued(ContainerId containerId) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.status = RecoveredContainerStatus.QUEUED; + } + + @Override public synchronized void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 2e7e8ef..0133156 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -259,6 +259,18 @@ public class TestNMLeveldbStateStoreService { // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); + // queue the container, and verify recovered + stateStore.storeContainerQueued(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); stateStore.storeContainerLaunched(containerId); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org