This is an automated email from the ASF dual-hosted git repository. jhung pushed a commit to branch YARN-8200 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/YARN-8200 by this push: new ea7b3d4 YARN-9180. Port YARN-7033 NM recovery of assigned resources to branch-2 ea7b3d4 is described below commit ea7b3d463fe4aff7c7b4d336a8db0674093e85e4 Author: Jonathan Hung <jh...@linkedin.com> AuthorDate: Fri Feb 1 15:20:50 2019 -0800 YARN-9180. Port YARN-7033 NM recovery of assigned resources to branch-2 --- .../containermanager/container/Container.java | 7 + .../containermanager/container/ContainerImpl.java | 13 ++ .../container/ResourceMappings.java | 124 ++++++++++++++++ .../recovery/NMLeveldbStateStoreService.java | 42 ++++++ .../recovery/NMNullStateStoreService.java | 7 + .../nodemanager/recovery/NMStateStoreService.java | 23 +++ .../TestContainerManagerRecovery.java | 163 +++++++++++++++------ .../recovery/NMMemoryStateStoreService.java | 14 ++ .../recovery/TestNMLeveldbStateStoreService.java | 122 ++++++++++----- .../server/nodemanager/webapp/MockContainer.java | 6 + 10 files changed, 436 insertions(+), 85 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index b9d1e31..b5e3aa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -98,4 +98,11 @@ public interface Container extends EventHandler<ContainerEvent> { void sendPauseEvent(String description); Priority getPriority(); + + /** + * Get assigned resource mappings to the container. + * + * @return Resource Mappings of the container + */ + ResourceMappings getResourceMappings(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 4675716..e6c7bce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -188,6 +188,7 @@ public class ContainerImpl implements Container { private boolean recoveredAsKilled = false; private Context context; private ResourceSet resourceSet; + private ResourceMappings resourceMappings; public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -245,6 +246,7 @@ public class ContainerImpl implements Container { stateMachine = stateMachineFactory.make(this, ContainerState.NEW, context.getContainerStateTransitionListener()); this.resourceSet = new ResourceSet(); + this.resourceMappings = new ResourceMappings(); } private static ContainerRetryContext configureRetryContext( @@ -285,6 +287,7 @@ public class ContainerImpl implements Container { this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); + this.resourceMappings = rcs.getResourceMappings(); } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -2172,4 +2175,14 @@ public class ContainerImpl implements Container { public Priority getPriority() { return containerTokenIdentifier.getPriority(); } + + /** + * Get assigned resource mappings to the container. + * + * @return Resource Mappings of the container + */ + @Override + public ResourceMappings getResourceMappings() { + return resourceMappings; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java new file mode 100644 index 0000000..d673341 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; + +/** + * This class is used to store assigned resource to a single container by + * resource types. + * + * Assigned resource could be list of String + * + * For example, we can assign container to: + * "numa": ["numa0"] + * "gpu": ["0", "1", "2", "3"] + * "fpga": ["1", "3"] + * + * This will be used for NM restart container recovery. + */ +public class ResourceMappings { + + private Map<String, AssignedResources> assignedResourcesMap = new HashMap<>(); + + /** + * Get all resource mappings. + * @param resourceType resourceType + * @return map of resource mapping + */ + public List<Serializable> getAssignedResources(String resourceType) { + AssignedResources ar = assignedResourcesMap.get(resourceType); + if (null == ar) { + return Collections.emptyList(); + } + return ar.getAssignedResources(); + } + + /** + * Adds the resources for a given resource type. + * + * @param resourceType Resource Type + * @param assigned Assigned resources to add + */ + public void addAssignedResources(String resourceType, + AssignedResources assigned) { + assignedResourcesMap.put(resourceType, assigned); + } + + /** + * Stores resources assigned to a container for a given resource type. + */ + public static class AssignedResources implements Serializable { + private static final long serialVersionUID = -1059491941955757926L; + private List<Serializable> resources = Collections.emptyList(); + + public List<Serializable> getAssignedResources() { + return Collections.unmodifiableList(resources); + } + + public void updateAssignedResources(List<Serializable> list) { + this.resources = new ArrayList<>(list); + } + + @SuppressWarnings("unchecked") + public static AssignedResources fromBytes(byte[] bytes) + throws IOException { + ObjectInputStream ois = null; + List<Serializable> resources; + try { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ois = new ObjectInputStream(bis); + resources = (List<Serializable>) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(ois); + } + AssignedResources ar = new AssignedResources(); + ar.updateAssignedResources(resources); + return ar; + } + + public byte[] toBytes() throws IOException { + ObjectOutputStream oos = null; + byte[] bytes; + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + oos = new ObjectOutputStream(bos); + oos.writeObject(resources); + bytes = bos.toByteArray(); + } finally { + IOUtils.closeQuietly(oos); + } + return bytes; + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 129fa8f..6aec1be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -43,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -148,6 +151,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; + private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX = + "/assignedResources_"; + private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; @@ -309,6 +315,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { rcs.setLogDir(asString(entry.getValue())); + } else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) { + String resourceType = suffix.substring( + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length()); + ResourceMappings.AssignedResources assignedResources = + ResourceMappings.AssignedResources.fromBytes(entry.getValue()); + rcs.getResourceMappings().addAssignedResources(resourceType, + assignedResources); } else { LOG.warn("the container " + containerId + " will be killed because of the unknown key " + key @@ -1166,6 +1179,35 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List<Serializable> assignedResources) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("storeAssignedResources: containerId=" + containerId + + ", assignedResources=" + StringUtils.join(",", assignedResources)); + } + + String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; + try { + WriteBatch batch = db.createWriteBatch(); + try { + ResourceMappings.AssignedResources res = + new ResourceMappings.AssignedResources(); + res.updateAssignedResources(assignedResources); + + // New value will overwrite old values for the same key + batch.put(bytes(keyResChng), res.toBytes()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + @SuppressWarnings("deprecation") private void cleanupDeprecatedFinishedApps() { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index aaf6fb2..6e3707b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.io.Serializable; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -267,6 +268,12 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List<Serializable> assignedResources) + throws IOException { + } + + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 1cdbd27..a929fe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @Private @Unstable @@ -90,6 +92,7 @@ public abstract class NMStateStoreService extends AbstractService { private RecoveredContainerType recoveryType = RecoveredContainerType.RECOVER; private long startTime; + private ResourceMappings resMappings = new ResourceMappings(); public RecoveredContainerStatus getStatus() { return status; @@ -174,6 +177,14 @@ public abstract class NMStateStoreService extends AbstractService { public void setRecoveryType(RecoveredContainerType recoveryType) { this.recoveryType = recoveryType; } + + public ResourceMappings getResourceMappings() { + return resMappings; + } + + public void setResourceMappings(ResourceMappings mappings) { + this.resMappings = mappings; + } } public static class LocalResourceTrackerState { @@ -718,6 +729,18 @@ public abstract class NMStateStoreService extends AbstractService { public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt) throws IOException; + /** + * Store the assigned resources to a container. + * + * @param containerId Container Id + * @param resourceType Resource Type + * @param assignedResources Assigned resources + * @throws IOException if fails + */ + public abstract void storeAssignedResources(ContainerId containerId, + String resourceType, List<Serializable> assignedResources) + throws IOException; + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 8980a49..6241055 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify; import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.io.Serializable; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -91,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -110,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerIn import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -457,7 +460,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { NMStateStoreService stateStore = new NMMemoryStateStoreService(); stateStore.init(conf); stateStore.start(); - Context context = createContext(conf, stateStore); + context = createContext(conf, stateStore); ContainerManagerImpl cm = createContainerManager(context, delSrvc); ((NMContext) context).setContainerManager(cm); cm.init(conf); @@ -467,55 +470,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId cid = ContainerId.newContainerId(attemptId, 1); - Map<String, String> containerEnv = new HashMap<>(); - setFlowContext(containerEnv, "app_name1", appId); - Map<String, ByteBuffer> serviceData = Collections.emptyMap(); - Credentials containerCreds = new Credentials(); - DataOutputBuffer dob = new DataOutputBuffer(); - containerCreds.writeTokenStorageToStream(dob); - ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, - dob.getLength()); - Map<ApplicationAccessType, String> acls = Collections.emptyMap(); - File tmpDir = new File("target", - this.getClass().getSimpleName() + "-tmpDir"); - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); - PrintWriter fileWriter = new PrintWriter(scriptFile); - if (Shell.WINDOWS) { - fileWriter.println("@ping -n 100 127.0.0.1 >nul"); - } else { - fileWriter.write("\numask 0"); - fileWriter.write("\nexec sleep 100"); - } - fileWriter.close(); - FileContext localFS = FileContext.getLocalFSFileContext(); - URL resource_alpha = - URL.fromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = RecordFactoryProvider - .getRecordFactory(null).newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file"; - Map<String, LocalResource> localResources = new HashMap<>(); - localResources.put(destinationFile, rsrc_alpha); - List<String> commands = - Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - ContainerLaunchContext clc = ContainerLaunchContext.newInstance( - localResources, containerEnv, commands, serviceData, - containerTokens, acls); - StartContainersResponse startResponse = startContainer( - context, cm, cid, clc, null); - assertTrue(startResponse.getFailedRequests().isEmpty()); - assertEquals(1, context.getApplications().size()); + + commonLaunchContainer(appId, cid, cm); + Application app = context.getApplications().get(appId); assertNotNull(app); - // make sure the container reaches RUNNING state - waitForNMContainerState(cm, cid, - org.apache.hadoop.yarn.server.nodemanager - .containermanager.container.ContainerState.RUNNING); + Resource targetResource = Resource.newInstance(2048, 2); ContainerUpdateResponse updateResponse = updateContainers(context, cm, cid, targetResource); @@ -539,6 +499,62 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { } @Test + public void testResourceMappingRecoveryForContainer() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + ((NMContext) context).setContainerManager(cm); + cm.init(conf); + cm.start(); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + + commonLaunchContainer(appId, cid, cm); + + Application app = context.getApplications().get(appId); + assertNotNull(app); + + // store resource mapping of the container + List<Serializable> gpuResources = + Arrays.<Serializable>asList("1", "2", "3"); + stateStore.storeAssignedResources(cid, "gpu", gpuResources); + List<Serializable> numaResources = Arrays.<Serializable>asList("numa1"); + stateStore.storeAssignedResources(cid, "numa", numaResources); + List<Serializable> fpgaResources = + Arrays.<Serializable>asList("fpga1", "fpga2"); + stateStore.storeAssignedResources(cid, "fpga", fpgaResources); + + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + ((NMContext) context).setContainerManager(cm); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + + Container nmContainer = context.getContainers().get(cid); + Assert.assertNotNull(nmContainer); + ResourceMappings resourceMappings = nmContainer.getResourceMappings(); + List<Serializable> assignedResource = resourceMappings + .getAssignedResources("gpu"); + Assert.assertTrue(assignedResource.equals(gpuResources)); + Assert.assertTrue( + resourceMappings.getAssignedResources("numa").equals(numaResources)); + Assert.assertTrue( + resourceMappings.getAssignedResources("fpga").equals(fpgaResources)); + } + + @Test public void testContainerCleanupOnShutdown() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationAttemptId attemptId = @@ -610,6 +626,57 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class)); } + private void commonLaunchContainer(ApplicationId appId, ContainerId cid, + ContainerManagerImpl cm) throws Exception { + Map<String, String> containerEnv = new HashMap<>(); + setFlowContext(containerEnv, "app_name1", appId); + Map<String, ByteBuffer> serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map<ApplicationAccessType, String> acls = Collections.emptyMap(); + File tmpDir = new File("target", + this.getClass().getSimpleName() + "-tmpDir"); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + FileContext localFS = FileContext.getLocalFSFileContext(); + URL resource_alpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map<String, LocalResource> localResources = new HashMap<>(); + localResources.put(destinationFile, rsrc_alpha); + List<String> commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + containerTokens, acls); + StartContainersResponse startResponse = startContainer( + context, cm, cid, clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + } + private ContainerManagerImpl createContainerManager(Context context, DeletionService delSrvc) { return new ContainerManagerImpl(context, exec, delSrvc, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 0e46234..5d424ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDelet import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -124,6 +126,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); + rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } return result; @@ -511,6 +514,17 @@ public class NMMemoryStateStoreService extends NMStateStoreService { amrmProxyState.getAppContexts().remove(attempt); } + @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List<Serializable> assignedResources) + throws IOException { + ResourceMappings.AssignedResources ar = + new ResourceMappings.AssignedResources(); + ar.updateAssignedResources(assignedResources); + containerStates.get(containerId).getResourceMappings() + .addAssignedResources(resourceType, ar); + } + private static class TrackerState { Map<Path, LocalResourceProto> inProgressMap = new HashMap<Path, LocalResourceProto>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index a507938..270b8af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify; import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -1003,46 +1004,12 @@ public class TestNMLeveldbStateStoreService { .loadContainersState(); assertTrue(recoveredContainers.isEmpty()); - // create a container request ApplicationId appId = ApplicationId.newInstance(1234, 3); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 4); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); - LocalResource lrsrc = LocalResource.newInstance( - URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, - 1234567890L); - Map<String, LocalResource> localResources = - new HashMap<String, LocalResource>(); - localResources.put("rsrc", lrsrc); - Map<String, String> env = new HashMap<String, String>(); - env.put("somevar", "someval"); - List<String> containerCmds = new ArrayList<String>(); - containerCmds.add("somecmd"); - containerCmds.add("somearg"); - Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); - serviceData.put("someservice", - ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); - ByteBuffer containerTokens = ByteBuffer - .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); - Map<ApplicationAccessType, String> acls = - new HashMap<ApplicationAccessType, String>(); - acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); - acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); - ContainerLaunchContext clc = ContainerLaunchContext.newInstance( - localResources, env, containerCmds, - serviceData, containerTokens, acls); - Resource containerRsrc = Resource.newInstance(1357, 3); - ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier( - containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, - Priority.newInstance(7), 13579); - Token containerToken = Token.newInstance(containerTokenId.getBytes(), - ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), - "tokenservice"); - StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, - containerToken); - - stateStore.storeContainer(containerId, 0, 0, containerReq); + StartContainerRequest startContainerRequest = storeMockContainer( + containerId); // add a invalid key byte[] invalidKey = ("ContainerManager/containers/" @@ -1055,7 +1022,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(startContainerRequest, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType()); // assert unknown keys are cleaned up finally @@ -1163,6 +1130,87 @@ public class TestNMLeveldbStateStoreService { } } + @Test + public void testStateStoreForResourceMapping() throws IOException { + // test empty when no state + List<RecoveredContainerState> recoveredContainers = stateStore + .loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + storeMockContainer(containerId); + + // Store ResourceMapping + stateStore.storeAssignedResources(containerId, "gpu", + Arrays.<Serializable>asList("1", "2", "3")); + // This will overwrite above + List<Serializable> gpuRes1 = Arrays.<Serializable>asList("1", "2", "4"); + stateStore.storeAssignedResources(containerId, "gpu", gpuRes1); + List<Serializable> fpgaRes = + Arrays.<Serializable>asList("3", "4", "5", "6"); + stateStore.storeAssignedResources(containerId, "fpga", fpgaRes); + List<Serializable> numaRes = Arrays.<Serializable>asList("numa1"); + stateStore.storeAssignedResources(containerId, "numa", numaRes); + + // add a invalid key + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + List<Serializable> res = rcs.getResourceMappings() + .getAssignedResources("gpu"); + Assert.assertTrue(res.equals(gpuRes1)); + + res = rcs.getResourceMappings().getAssignedResources("fpga"); + Assert.assertTrue(res.equals(fpgaRes)); + + res = rcs.getResourceMappings().getAssignedResources("numa"); + Assert.assertTrue(res.equals(numaRes)); + } + + private StartContainerRequest storeMockContainer(ContainerId containerId) + throws IOException { + // create a container request + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(); + localResources.put("rsrc", lrsrc); + Map<String, String> env = new HashMap<String, String>(); + env.put("somevar", "someval"); + List<String> containerCmds = new ArrayList<String>(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = ByteBuffer + .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, + serviceData, containerTokens, acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier( + containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468, + Priority.newInstance(7), 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, + containerToken); + stateStore.storeContainer(containerId, 0, 0, containerReq); + return containerReq; + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index b9c6fff..29c2038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -242,4 +243,9 @@ public class MockContainer implements Container { public long getContainerStartTime() { return 0; } + + @Override + public ResourceMappings getResourceMappings() { + return null; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org