This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch YARN-8200
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/YARN-8200 by this push:
     new ea7b3d4  YARN-9180. Port YARN-7033 NM recovery of assigned resources 
to branch-2
ea7b3d4 is described below

commit ea7b3d463fe4aff7c7b4d336a8db0674093e85e4
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Fri Feb 1 15:20:50 2019 -0800

    YARN-9180. Port YARN-7033 NM recovery of assigned resources to branch-2
---
 .../containermanager/container/Container.java      |   7 +
 .../containermanager/container/ContainerImpl.java  |  13 ++
 .../container/ResourceMappings.java                | 124 ++++++++++++++++
 .../recovery/NMLeveldbStateStoreService.java       |  42 ++++++
 .../recovery/NMNullStateStoreService.java          |   7 +
 .../nodemanager/recovery/NMStateStoreService.java  |  23 +++
 .../TestContainerManagerRecovery.java              | 163 +++++++++++++++------
 .../recovery/NMMemoryStateStoreService.java        |  14 ++
 .../recovery/TestNMLeveldbStateStoreService.java   | 122 ++++++++++-----
 .../server/nodemanager/webapp/MockContainer.java   |   6 +
 10 files changed, 436 insertions(+), 85 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index b9d1e31..b5e3aa1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -98,4 +98,11 @@ public interface Container extends 
EventHandler<ContainerEvent> {
   void sendPauseEvent(String description);
 
   Priority getPriority();
+
+  /**
+   * Get assigned resource mappings to the container.
+   *
+   * @return Resource Mappings of the container
+   */
+  ResourceMappings getResourceMappings();
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 4675716..e6c7bce 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -188,6 +188,7 @@ public class ContainerImpl implements Container {
   private boolean recoveredAsKilled = false;
   private Context context;
   private ResourceSet resourceSet;
+  private ResourceMappings resourceMappings;
 
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
@@ -245,6 +246,7 @@ public class ContainerImpl implements Container {
     stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
         context.getContainerStateTransitionListener());
     this.resourceSet = new ResourceSet();
+    this.resourceMappings = new ResourceMappings();
   }
 
   private static ContainerRetryContext configureRetryContext(
@@ -285,6 +287,7 @@ public class ContainerImpl implements Container {
     this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
     this.workDir = rcs.getWorkDir();
     this.logDir = rcs.getLogDir();
+    this.resourceMappings = rcs.getResourceMappings();
   }
 
   private static final ContainerDiagnosticsUpdateTransition 
UPDATE_DIAGNOSTICS_TRANSITION =
@@ -2172,4 +2175,14 @@ public class ContainerImpl implements Container {
   public Priority getPriority() {
     return containerTokenIdentifier.getPriority();
   }
+
+  /**
+   * Get assigned resource mappings to the container.
+   *
+   * @return Resource Mappings of the container
+   */
+  @Override
+  public ResourceMappings getResourceMappings() {
+    return resourceMappings;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
new file mode 100644
index 0000000..d673341
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * This class is used to store assigned resource to a single container by
+ * resource types.
+ *
+ * Assigned resource could be list of String
+ *
+ * For example, we can assign container to:
+ * "numa": ["numa0"]
+ * "gpu": ["0", "1", "2", "3"]
+ * "fpga": ["1", "3"]
+ *
+ * This will be used for NM restart container recovery.
+ */
+public class ResourceMappings {
+
+  private Map<String, AssignedResources> assignedResourcesMap = new 
HashMap<>();
+
+  /**
+   * Get all resource mappings.
+   * @param resourceType resourceType
+   * @return map of resource mapping
+   */
+  public List<Serializable> getAssignedResources(String resourceType) {
+    AssignedResources ar = assignedResourcesMap.get(resourceType);
+    if (null == ar) {
+      return Collections.emptyList();
+    }
+    return ar.getAssignedResources();
+  }
+
+  /**
+   * Adds the resources for a given resource type.
+   *
+   * @param resourceType Resource Type
+   * @param assigned Assigned resources to add
+   */
+  public void addAssignedResources(String resourceType,
+      AssignedResources assigned) {
+    assignedResourcesMap.put(resourceType, assigned);
+  }
+
+  /**
+   * Stores resources assigned to a container for a given resource type.
+   */
+  public static class AssignedResources implements Serializable {
+    private static final long serialVersionUID = -1059491941955757926L;
+    private List<Serializable> resources = Collections.emptyList();
+
+    public List<Serializable> getAssignedResources() {
+      return Collections.unmodifiableList(resources);
+    }
+
+    public void updateAssignedResources(List<Serializable> list) {
+      this.resources = new ArrayList<>(list);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static AssignedResources fromBytes(byte[] bytes)
+        throws IOException {
+      ObjectInputStream ois = null;
+      List<Serializable> resources;
+      try {
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        ois = new ObjectInputStream(bis);
+        resources = (List<Serializable>) ois.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      } finally {
+        IOUtils.closeQuietly(ois);
+      }
+      AssignedResources ar = new AssignedResources();
+      ar.updateAssignedResources(resources);
+      return ar;
+    }
+
+    public byte[] toBytes() throws IOException {
+      ObjectOutputStream oos = null;
+      byte[] bytes;
+      try {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        oos = new ObjectOutputStream(bos);
+        oos.writeObject(resources);
+        bytes = bos.toByteArray();
+      } finally {
+        IOUtils.closeQuietly(oos);
+      }
+      return bytes;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 129fa8f..6aec1be 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -43,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
@@ -62,6 +64,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
 import 
org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -148,6 +151,9 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
 
   private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
 
+  private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
+      "/assignedResources_";
+
   private static final byte[] EMPTY_VALUE = new byte[0];
 
   private DB db;
@@ -309,6 +315,13 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         rcs.setWorkDir(asString(entry.getValue()));
       } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
         rcs.setLogDir(asString(entry.getValue()));
+      } else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) {
+        String resourceType = suffix.substring(
+            CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length());
+        ResourceMappings.AssignedResources assignedResources =
+            ResourceMappings.AssignedResources.fromBytes(entry.getValue());
+        rcs.getResourceMappings().addAssignedResources(resourceType,
+            assignedResources);
       } else {
         LOG.warn("the container " + containerId
             + " will be killed because of the unknown key " + key
@@ -1166,6 +1179,35 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
     }
   }
 
+  @Override
+  public void storeAssignedResources(ContainerId containerId,
+      String resourceType, List<Serializable> assignedResources)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("storeAssignedResources: containerId=" + containerId
+          + ", assignedResources=" + StringUtils.join(",", assignedResources));
+    }
+
+    String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        ResourceMappings.AssignedResources res =
+            new ResourceMappings.AssignedResources();
+        res.updateAssignedResources(assignedResources);
+
+        // New value will overwrite old values for the same key
+        batch.put(bytes(keyResChng), res.toBytes());
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   @SuppressWarnings("deprecation")
   private void cleanupDeprecatedFinishedApps() {
     try {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index aaf6fb2..6e3707b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -267,6 +268,12 @@ public class NMNullStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
+  public void storeAssignedResources(ContainerId containerId,
+      String resourceType, List<Serializable> assignedResources)
+      throws IOException {
+  }
+
+  @Override
   protected void initStorage(Configuration conf) throws IOException {
   }
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 1cdbd27..a929fe2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +44,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 
 @Private
 @Unstable
@@ -90,6 +92,7 @@ public abstract class NMStateStoreService extends 
AbstractService {
     private RecoveredContainerType recoveryType =
         RecoveredContainerType.RECOVER;
     private long startTime;
+    private ResourceMappings resMappings = new ResourceMappings();
 
     public RecoveredContainerStatus getStatus() {
       return status;
@@ -174,6 +177,14 @@ public abstract class NMStateStoreService extends 
AbstractService {
     public void setRecoveryType(RecoveredContainerType recoveryType) {
       this.recoveryType = recoveryType;
     }
+
+    public ResourceMappings getResourceMappings() {
+      return resMappings;
+    }
+
+    public void setResourceMappings(ResourceMappings mappings) {
+      this.resMappings = mappings;
+    }
   }
 
   public static class LocalResourceTrackerState {
@@ -718,6 +729,18 @@ public abstract class NMStateStoreService extends 
AbstractService {
   public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
       throws IOException;
 
+  /**
+   * Store the assigned resources to a container.
+   *
+   * @param containerId Container Id
+   * @param resourceType Resource Type
+   * @param assignedResources Assigned resources
+   * @throws IOException if fails
+   */
+  public abstract void storeAssignedResources(ContainerId containerId,
+      String resourceType, List<Serializable> assignedResources)
+      throws IOException;
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 8980a49..6241055 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -91,6 +92,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -110,6 +112,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerIn
 import 
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -457,7 +460,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     NMStateStoreService stateStore = new NMMemoryStateStoreService();
     stateStore.init(conf);
     stateStore.start();
-    Context context = createContext(conf, stateStore);
+    context = createContext(conf, stateStore);
     ContainerManagerImpl cm = createContainerManager(context, delSrvc);
     ((NMContext) context).setContainerManager(cm);
     cm.init(conf);
@@ -467,55 +470,12 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     ApplicationAttemptId attemptId =
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
-    Map<String, String> containerEnv = new HashMap<>();
-    setFlowContext(containerEnv, "app_name1", appId);
-    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
-    Credentials containerCreds = new Credentials();
-    DataOutputBuffer dob = new DataOutputBuffer();
-    containerCreds.writeTokenStorageToStream(dob);
-    ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
-        dob.getLength());
-    Map<ApplicationAccessType, String> acls = Collections.emptyMap();
-    File tmpDir = new File("target",
-        this.getClass().getSimpleName() + "-tmpDir");
-    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
-    PrintWriter fileWriter = new PrintWriter(scriptFile);
-    if (Shell.WINDOWS) {
-      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
-    } else {
-      fileWriter.write("\numask 0");
-      fileWriter.write("\nexec sleep 100");
-    }
-    fileWriter.close();
-    FileContext localFS = FileContext.getLocalFSFileContext();
-    URL resource_alpha =
-        URL.fromPath(localFS
-            .makeQualified(new Path(scriptFile.getAbsolutePath())));
-    LocalResource rsrc_alpha = RecordFactoryProvider
-        .getRecordFactory(null).newRecordInstance(LocalResource.class);
-    rsrc_alpha.setResource(resource_alpha);
-    rsrc_alpha.setSize(-1);
-    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
-    rsrc_alpha.setType(LocalResourceType.FILE);
-    rsrc_alpha.setTimestamp(scriptFile.lastModified());
-    String destinationFile = "dest_file";
-    Map<String, LocalResource> localResources = new HashMap<>();
-    localResources.put(destinationFile, rsrc_alpha);
-    List<String> commands =
-        Arrays.asList(Shell.getRunScriptCommand(scriptFile));
-    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
-        localResources, containerEnv, commands, serviceData,
-        containerTokens, acls);
-    StartContainersResponse startResponse = startContainer(
-        context, cm, cid, clc, null);
-    assertTrue(startResponse.getFailedRequests().isEmpty());
-    assertEquals(1, context.getApplications().size());
+
+    commonLaunchContainer(appId, cid, cm);
+
     Application app = context.getApplications().get(appId);
     assertNotNull(app);
-    // make sure the container reaches RUNNING state
-    waitForNMContainerState(cm, cid,
-        org.apache.hadoop.yarn.server.nodemanager
-            .containermanager.container.ContainerState.RUNNING);
+
     Resource targetResource = Resource.newInstance(2048, 2);
     ContainerUpdateResponse updateResponse =
         updateContainers(context, cm, cid, targetResource);
@@ -539,6 +499,62 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
   }
 
   @Test
+  public void testResourceMappingRecoveryForContainer() throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    context = createContext(conf, stateStore);
+    ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+    ((NMContext) context).setContainerManager(cm);
+    cm.init(conf);
+    cm.start();
+
+    // add an application by starting a container
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+
+    commonLaunchContainer(appId, cid, cm);
+
+    Application app = context.getApplications().get(appId);
+    assertNotNull(app);
+
+    // store resource mapping of the container
+    List<Serializable> gpuResources =
+        Arrays.<Serializable>asList("1", "2", "3");
+    stateStore.storeAssignedResources(cid, "gpu", gpuResources);
+    List<Serializable> numaResources = Arrays.<Serializable>asList("numa1");
+    stateStore.storeAssignedResources(cid, "numa", numaResources);
+    List<Serializable> fpgaResources =
+        Arrays.<Serializable>asList("fpga1", "fpga2");
+    stateStore.storeAssignedResources(cid, "fpga", fpgaResources);
+
+    cm.stop();
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context);
+    ((NMContext) context).setContainerManager(cm);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = context.getApplications().get(appId);
+    assertNotNull(app);
+
+    Container nmContainer = context.getContainers().get(cid);
+    Assert.assertNotNull(nmContainer);
+    ResourceMappings resourceMappings = nmContainer.getResourceMappings();
+    List<Serializable> assignedResource = resourceMappings
+        .getAssignedResources("gpu");
+    Assert.assertTrue(assignedResource.equals(gpuResources));
+    Assert.assertTrue(
+        resourceMappings.getAssignedResources("numa").equals(numaResources));
+    Assert.assertTrue(
+        resourceMappings.getAssignedResources("fpga").equals(fpgaResources));
+  }
+
+  @Test
   public void testContainerCleanupOnShutdown() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     ApplicationAttemptId attemptId =
@@ -610,6 +626,57 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
   }
 
+  private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
+      ContainerManagerImpl cm) throws Exception {
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Credentials containerCreds = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    containerCreds.writeTokenStorageToStream(dob);
+    ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+        dob.getLength());
+    Map<ApplicationAccessType, String> acls = Collections.emptyMap();
+    File tmpDir = new File("target",
+        this.getClass().getSimpleName() + "-tmpDir");
+    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+    PrintWriter fileWriter = new PrintWriter(scriptFile);
+    if (Shell.WINDOWS) {
+      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+    } else {
+      fileWriter.write("\numask 0");
+      fileWriter.write("\nexec sleep 100");
+    }
+    fileWriter.close();
+    FileContext localFS = FileContext.getLocalFSFileContext();
+    URL resource_alpha =
+        URL.fromPath(localFS
+            .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    LocalResource rsrc_alpha = RecordFactoryProvider
+        .getRecordFactory(null).newRecordInstance(LocalResource.class);
+    rsrc_alpha.setResource(resource_alpha);
+    rsrc_alpha.setSize(-1);
+    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc_alpha.setType(LocalResourceType.FILE);
+    rsrc_alpha.setTimestamp(scriptFile.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources = new HashMap<>();
+    localResources.put(destinationFile, rsrc_alpha);
+    List<String> commands =
+        Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, containerEnv, commands, serviceData,
+        containerTokens, acls);
+    StartContainersResponse startResponse = startContainer(
+        context, cm, cid, clc, null);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    // make sure the container reaches RUNNING state
+    waitForNMContainerState(cm, cid,
+        org.apache.hadoop.yarn.server.nodemanager
+            .containermanager.container.ContainerState.RUNNING);
+  }
+
   private ContainerManagerImpl createContainerManager(Context context,
       DeletionService delSrvc) {
     return new ContainerManagerImpl(context, exec, delSrvc,
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 0e46234..5d424ad 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -42,6 +43,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDelet
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 
 
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -124,6 +126,7 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
       rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
       rcsCopy.setWorkDir(rcs.getWorkDir());
       rcsCopy.setLogDir(rcs.getLogDir());
+      rcsCopy.setResourceMappings(rcs.getResourceMappings());
       result.add(rcsCopy);
     }
     return result;
@@ -511,6 +514,17 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
     amrmProxyState.getAppContexts().remove(attempt);
   }
 
+  @Override
+  public void storeAssignedResources(ContainerId containerId,
+      String resourceType, List<Serializable> assignedResources)
+      throws IOException {
+    ResourceMappings.AssignedResources ar =
+        new ResourceMappings.AssignedResources();
+    ar.updateAssignedResources(assignedResources);
+    containerStates.get(containerId).getResourceMappings()
+        .addAssignedResources(resourceType, ar);
+  }
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index a507938..270b8af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1003,46 +1004,12 @@ public class TestNMLeveldbStateStoreService {
         .loadContainersState();
     assertTrue(recoveredContainers.isEmpty());
 
-    // create a container request
     ApplicationId appId = ApplicationId.newInstance(1234, 3);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
         4);
     ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
-    LocalResource lrsrc = LocalResource.newInstance(
-        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
-        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
-        1234567890L);
-    Map<String, LocalResource> localResources =
-        new HashMap<String, LocalResource>();
-    localResources.put("rsrc", lrsrc);
-    Map<String, String> env = new HashMap<String, String>();
-    env.put("somevar", "someval");
-    List<String> containerCmds = new ArrayList<String>();
-    containerCmds.add("somecmd");
-    containerCmds.add("somearg");
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-    serviceData.put("someservice",
-        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
-    ByteBuffer containerTokens = ByteBuffer
-        .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
-    Map<ApplicationAccessType, String> acls =
-        new HashMap<ApplicationAccessType, String>();
-    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
-    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
-    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
-        localResources, env, containerCmds,
-        serviceData, containerTokens, acls);
-    Resource containerRsrc = Resource.newInstance(1357, 3);
-    ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
-        containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
-        Priority.newInstance(7), 13579);
-    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
-        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
-        "tokenservice");
-    StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
-        containerToken);
-
-    stateStore.storeContainer(containerId, 0, 0, containerReq);
+    StartContainerRequest startContainerRequest = storeMockContainer(
+        containerId);
 
     // add a invalid key
     byte[] invalidKey = ("ContainerManager/containers/"
@@ -1055,7 +1022,7 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
-    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(startContainerRequest, rcs.getStartRequest());
     assertTrue(rcs.getDiagnostics().isEmpty());
     assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
     // assert unknown keys are cleaned up finally
@@ -1163,6 +1130,87 @@ public class TestNMLeveldbStateStoreService {
     }
   }
 
+  @Test
+  public void testStateStoreForResourceMapping() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers = stateStore
+        .loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        4);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+    storeMockContainer(containerId);
+
+    // Store ResourceMapping
+    stateStore.storeAssignedResources(containerId, "gpu",
+        Arrays.<Serializable>asList("1", "2", "3"));
+    // This will overwrite above
+    List<Serializable> gpuRes1 = Arrays.<Serializable>asList("1", "2", "4");
+    stateStore.storeAssignedResources(containerId, "gpu", gpuRes1);
+    List<Serializable> fpgaRes =
+        Arrays.<Serializable>asList("3", "4", "5", "6");
+    stateStore.storeAssignedResources(containerId, "fpga", fpgaRes);
+    List<Serializable> numaRes = Arrays.<Serializable>asList("numa1");
+    stateStore.storeAssignedResources(containerId, "numa", numaRes);
+
+    // add a invalid key
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    List<Serializable> res = rcs.getResourceMappings()
+        .getAssignedResources("gpu");
+    Assert.assertTrue(res.equals(gpuRes1));
+
+    res = rcs.getResourceMappings().getAssignedResources("fpga");
+    Assert.assertTrue(res.equals(fpgaRes));
+
+    res = rcs.getResourceMappings().getAssignedResources("numa");
+    Assert.assertTrue(res.equals(numaRes));
+  }
+
+  private StartContainerRequest storeMockContainer(ContainerId containerId)
+      throws IOException {
+    // create a container request
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens = ByteBuffer
+        .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds,
+        serviceData, containerTokens, acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
+        containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
+        Priority.newInstance(7), 13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
+        containerToken);
+    stateStore.storeContainer(containerId, 0, 0, containerReq);
+    return containerReq;
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index b9c6fff..29c2038 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
@@ -242,4 +243,9 @@ public class MockContainer implements Container {
   public long getContainerStartTime() {
     return 0;
   }
+
+  @Override
+  public ResourceMappings getResourceMappings() {
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to