YARN-6966. NodeManager metrics may return wrong negative values when NM restart. (Szilard Nemeth via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d3c39e9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d3c39e9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d3c39e9 Branch: refs/heads/HADOOP-15461 Commit: 9d3c39e9dd88b8f32223c01328581bb68507d415 Parents: 3a9e25e Author: Haibo Chen <haiboc...@apache.org> Authored: Mon Jul 23 11:06:44 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Mon Jul 23 11:07:24 2018 -0700 ---------------------------------------------------------------------- .../containermanager/ContainerManagerImpl.java | 2 +- .../scheduler/ContainerScheduler.java | 16 ++++-- .../recovery/NMLeveldbStateStoreService.java | 32 ++++++----- .../recovery/NMNullStateStoreService.java | 2 +- .../recovery/NMStateStoreService.java | 3 +- .../BaseContainerManagerTest.java | 2 +- .../TestContainerManagerRecovery.java | 57 ++++++++++++++++++++ .../TestContainerSchedulerRecovery.java | 46 +++++++++++----- .../metrics/TestNodeManagerMetrics.java | 4 +- .../recovery/NMMemoryStateStoreService.java | 16 +++++- .../TestNMLeveldbStateStoreService.java | 21 +++++++- 11 files changed, 163 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ad63720..89bef8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -496,7 +496,7 @@ public class ContainerManagerImpl extends CompositeService implements Container container = new ContainerImpl(getConfig(), dispatcher, launchContext, credentials, metrics, token, context, rcs); context.getContainers().put(token.getContainerID(), container); - containerScheduler.recoverActiveContainer(container, rcs.getStatus()); + containerScheduler.recoverActiveContainer(container, rcs); app.handle(new ApplicationContainerInitEvent(container)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 5cdcf41..a61b9d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService + .RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,11 +262,11 @@ public class ContainerScheduler extends AbstractService implements * @param rcs Recovered Container status */ public void recoverActiveContainer(Container container, - RecoveredContainerStatus rcs) { + RecoveredContainerState rcs) { ExecutionType execType = container.getContainerTokenIdentifier().getExecutionType(); - if (rcs == RecoveredContainerStatus.QUEUED - || rcs == RecoveredContainerStatus.PAUSED) { + if (rcs.getStatus() == RecoveredContainerStatus.QUEUED + || rcs.getStatus() == RecoveredContainerStatus.PAUSED) { if (execType == ExecutionType.GUARANTEED) { queuedGuaranteedContainers.put(container.getContainerId(), container); } else if (execType == ExecutionType.OPPORTUNISTIC) { @@ -274,10 +277,15 @@ public class ContainerScheduler extends AbstractService implements "UnKnown execution type received " + container.getContainerId() + ", execType " + execType); } - } else if (rcs == RecoveredContainerStatus.LAUNCHED) { + } else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) { runningContainers.put(container.getContainerId(), container); utilizationTracker.addContainerResources(container); } + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED + && rcs.getCapability() != null) { + metrics.launchedContainer(); + metrics.allocateContainer(rcs.getCapability()); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6f643b0..44f5e18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -237,7 +238,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { iter.seek(bytes(CONTAINERS_KEY_PREFIX)); while (iter.hasNext()) { - Entry<byte[],byte[]> entry = iter.peekNext(); + Entry<byte[], byte[]> entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { break; @@ -299,6 +300,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { rcs.startRequest = new StartContainerRequestPBImpl( StartContainerRequestProto.parseFrom(entry.getValue())); + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(rcs.startRequest.getContainerToken()); + rcs.capability = new ResourcePBImpl( + containerTokenIdentifier.getProto().getResource()); } else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) { rcs.version = Integer.parseInt(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) { @@ -382,24 +387,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { LOG.debug("storeContainer: containerId= " + idStr + ", startRequest= " + startRequest); } - String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); - String keyVersion = getContainerVersionKey(idStr); - String keyStartTime = + final String keyVersion = getContainerVersionKey(idStr); + final String keyRequest = + getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); + final StartContainerRequestProto startContainerRequest = + ((StartContainerRequestPBImpl) startRequest).getProto(); + + final String keyStartTime = getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX); + final String startTimeValue = Long.toString(startTime); + try { - WriteBatch batch = db.createWriteBatch(); - try { - batch.put(bytes(keyRequest), - ((StartContainerRequestPBImpl) startRequest).getProto(). - toByteArray()); - batch.put(bytes(keyStartTime), bytes(Long.toString(startTime))); + try (WriteBatch batch = db.createWriteBatch()) { + batch.put(bytes(keyRequest), startContainerRequest.toByteArray()); + batch.put(bytes(keyStartTime), bytes(startTimeValue)); if (containerVersion != 0) { batch.put(bytes(keyVersion), - bytes(Integer.toString(containerVersion))); + bytes(Integer.toString(containerVersion))); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { markStoreUnHealthy(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index f217f2f..dfad9cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -73,7 +73,7 @@ public class NMNullStateStoreService extends NMStateStoreService { @Override public void storeContainer(ContainerId containerId, int version, - long startTime, StartContainerRequest startRequest) throws IOException { + long startTime, StartContainerRequest startRequest) { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 0ea0ef3..70decdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -416,7 +416,8 @@ public abstract class NMStateStoreService extends AbstractService { * @throws IOException */ public abstract void storeContainer(ContainerId containerId, - int containerVersion, long startTime, StartContainerRequest startRequest) + int containerVersion, long startTime, + StartContainerRequest startRequest) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index b31601c..493aa4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -107,7 +107,7 @@ public abstract class BaseContainerManagerTest { protected static File remoteLogDir; protected static File tmpDir; - protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); + protected NodeManagerMetrics metrics = NodeManagerMetrics.create(); public BaseContainerManagerTest() throws UnsupportedFileSystemException { localFS = FileContext.getLocalFSFileContext(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 0a834af..a144adf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -106,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -401,6 +403,61 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { } @Test + public void testNodeManagerMetricsRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + metrics.addResource(Resource.newInstance(10240, 8)); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map<String, String> containerEnv = Collections.emptyMap(); + Map<String, ByteBuffer> serviceData = Collections.emptyMap(); + Map<String, LocalResource> localResources = Collections.emptyMap(); + List<String> commands = Arrays.asList("sleep 60s".split(" ")); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + null, null); + StartContainersResponse startResponse = startContainer(context, cm, cid, + clc, null, ContainerType.TASK); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7); + + // restart and verify metrics could be recovered + cm.stop(); + DefaultMetricsSystem.shutdown(); + metrics = NodeManagerMetrics.create(); + metrics.addResource(Resource.newInstance(10240, 8)); + TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8); + context = createContext(conf, stateStore); + cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7); + cm.stop(); + } + + @Test public void testContainerResizeRecovery() throws Exception { conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java index 2ae8b97..6b3ac67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doNothing; @@ -31,6 +32,8 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService + .RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.junit.After; import org.junit.Before; @@ -71,6 +74,13 @@ public class TestContainerSchedulerRecovery { private ContainerScheduler spy; + private RecoveredContainerState createRecoveredContainerState( + RecoveredContainerStatus status) { + RecoveredContainerState mockState = mock(RecoveredContainerState.class); + when(mockState.getStatus()).thenReturn(status); + return mockState; + } + @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); spy = spy(tempContainerScheduler); @@ -94,7 +104,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.QUEUED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -113,7 +124,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.QUEUED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -132,7 +144,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.PAUSED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -151,7 +164,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.PAUSED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -170,7 +184,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -189,7 +204,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -208,7 +224,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.REQUESTED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -227,7 +244,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.REQUESTED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -246,7 +264,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.COMPLETED); when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -265,7 +284,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.COMPLETED); when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); @@ -284,7 +304,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.QUEUED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); assertEquals(0, spy.getNumQueuedGuaranteedContainers()); @@ -302,7 +323,8 @@ public class TestContainerSchedulerRecovery { assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED; + RecoveredContainerState rcs = + createRecoveredContainerState(RecoveredContainerStatus.PAUSED); when(container.getContainerTokenIdentifier()).thenReturn(token); spy.recoverActiveContainer(container, rcs); assertEquals(0, spy.getNumQueuedGuaranteedContainers()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java index d21e7ad..c5f80ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java @@ -113,8 +113,8 @@ public class TestNodeManagerMetrics { assertGauge("AvailableVCores", 19, rb); } - private void checkMetrics(int launched, int completed, int failed, int killed, - int initing, int running, int allocatedGB, + public static void checkMetrics(int launched, int completed, int failed, + int killed, int initing, int running, int allocatedGB, int allocatedContainers, int availableGB, int allocatedVCores, int availableVCores) { MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index b67d11f..c5428d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; + +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + public class NMMemoryStateStoreService extends NMStateStoreService { private Map<ApplicationId, ContainerManagerApplicationProto> apps; private Map<ContainerId, RecoveredContainerState> containerStates; @@ -132,11 +136,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainer(ContainerId containerId, - int version, long startTime, StartContainerRequest startRequest) - throws IOException { + int version, long startTime, StartContainerRequest startRequest) { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; rcs.version = version; + try { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(startRequest.getContainerToken()); + rcs.capability = + new ResourcePBImpl(containerTokenIdentifier.getProto().getResource()); + } catch (IOException e) { + throw new RuntimeException(e); + } + rcs.setStartTime(startTime); containerStates.put(containerId, rcs); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 265b3e6..c8c07d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -238,7 +238,9 @@ public class TestNMLeveldbStateStoreService { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 4); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); - StartContainerRequest containerReq = createContainerRequest(containerId); + Resource containerResource = Resource.newInstance(1024, 2); + StartContainerRequest containerReq = + createContainerRequest(containerId, containerResource); // store a container and verify recovered long containerStartTime = System.currentTimeMillis(); @@ -260,6 +262,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); @@ -279,6 +282,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); @@ -294,6 +298,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + assertEquals(containerResource, rcs.getCapability()); // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); @@ -395,7 +400,17 @@ public class TestNMLeveldbStateStoreService { } private StartContainerRequest createContainerRequest( + ContainerId containerId, Resource res) { + return createContainerRequestInternal(containerId, res); + } + + private StartContainerRequest createContainerRequest( ContainerId containerId) { + return createContainerRequestInternal(containerId, null); + } + + private StartContainerRequest createContainerRequestInternal(ContainerId + containerId, Resource res) { LocalResource lrsrc = LocalResource.newInstance( URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, @@ -421,6 +436,10 @@ public class TestNMLeveldbStateStoreService { localResources, env, containerCmds, serviceData, containerTokens, acls); Resource containerRsrc = Resource.newInstance(1357, 3); + + if (res != null) { + containerRsrc = res; + } ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7), --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org