Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 2fce0fa8d -> d665d8568


YARN-5547. NMLeveldbStateStore should be more tolerant of unknown keys. 
Contributed by Ajith S.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d665d856
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d665d856
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d665d856

Branch: refs/heads/branch-2.8
Commit: d665d856825a6aaf48632a1826e1ca10608f9c10
Parents: 2fce0fa
Author: Junping Du <junping...@apache.org>
Authored: Thu Aug 31 18:10:33 2017 -0700
Committer: Junping Du <junping...@apache.org>
Committed: Thu Aug 31 18:10:33 2017 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  |  7 ++
 .../recovery/NMLeveldbStateStoreService.java    | 19 +++++-
 .../recovery/NMStateStoreService.java           | 19 +++++-
 .../TestNMLeveldbStateStoreService.java         | 69 ++++++++++++++++++++
 4 files changed, 112 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index f881586..942f7ac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -144,6 +144,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@@ -350,6 +351,12 @@ public class ContainerManagerImpl extends CompositeService 
implements
           credentials, metrics, token, context, rcs);
       context.getContainers().put(containerId, container);
       app.handle(new ApplicationContainerInitEvent(container));
+      if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
+      dispatcher.getEventHandler().handle(
+            new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
+                "Due to invalid StateStore info container was killed"
+                    + " during recovery"));
+      }
     } else {
       if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
         LOG.warn(containerId + " has no corresponding application!");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index f5a36f2..0408de2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -70,6 +70,8 @@ import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 
 public class NMLeveldbStateStoreService extends NMStateStoreService {
 
@@ -134,6 +136,12 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   private boolean isNewlyCreated;
   private Timer compactionTimer;
 
+  /**
+   * Map of containerID vs List of unknown key suffixes.
+   */
+  private ListMultimap<ContainerId, String> containerUnknownKeySuffixes =
+      ArrayListMultimap.create();
+
   public NMLeveldbStateStoreService() {
     super(NMLeveldbStateStoreService.class.getName());
   }
@@ -251,7 +259,11 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         rcs.capability = new ResourcePBImpl(
             ResourceProto.parseFrom(entry.getValue()));
       } else {
-        throw new IOException("Unexpected container state key: " + key);
+        LOG.warn("the container " + containerId
+            + " will be killed because of the unknown key " + key
+            + " during recovery.");
+        containerUnknownKeySuffixes.put(containerId, suffix);
+        rcs.setRecoveryType(RecoveredContainerType.KILL);
       }
     }
     return rcs;
@@ -380,6 +392,11 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        List<String> unknownKeysForContainer =
+            containerUnknownKeySuffixes.removeAll(containerId);
+        for (String unknownKeySuffix : unknownKeysForContainer) {
+          batch.delete(bytes(keyPrefix + unknownKeySuffix));
+        }
         db.write(batch);
       } finally {
         batch.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 9fca5be..7eb8763 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -59,6 +59,13 @@ public abstract class NMStateStoreService extends 
AbstractService {
 
   }
 
+  /**
+   * Type of post recovery action.
+   */
+  public enum RecoveredContainerType {
+    KILL, RECOVER
+  }
+
   public enum RecoveredContainerStatus {
     REQUESTED,
     LAUNCHED,
@@ -73,7 +80,9 @@ public abstract class NMStateStoreService extends 
AbstractService {
     StartContainerRequest startRequest;
     Resource capability;
     int version;
-
+    private RecoveredContainerType recoveryType =
+        RecoveredContainerType.RECOVER;
+    
     public RecoveredContainerStatus getStatus() {
       return status;
     }
@@ -113,6 +122,14 @@ public abstract class NMStateStoreService extends 
AbstractService {
           .append(", StartRequest: ").append(getStartRequest())
           .toString();
     }
+    
+    public RecoveredContainerType getRecoveryType() {
+      return recoveryType;
+    }
+
+    public void setRecoveryType(RecoveredContainerType recoveryType) {
+      this.recoveryType = recoveryType;
+    }
   }
 
   public static class LocalResourceTrackerState {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d665d856/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index c5f05f1..27e8b78 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -70,6 +70,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
@@ -923,6 +924,74 @@ public class TestNMLeveldbStateStoreService {
     store.close();
   }
 
+  @Test
+  public void testUnexpectedKeyDoesntThrowException() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers = stateStore
+        .loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        4);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens = ByteBuffer
+        .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds,
+        serviceData, containerTokens, acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
+        containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
+        Priority.newInstance(7), 13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
+        containerToken);
+
+    stateStore.storeContainer(containerId, 0, containerReq);
+
+    // add a invalid key
+    byte[] invalidKey = ("ContainerManager/containers/"
+    + containerId.toString() + "/invalidKey1234").getBytes();
+    stateStore.getDB().put(invalidKey, new byte[1]);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+    assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
+    // assert unknown keys are cleaned up finally
+    assertNotNull(stateStore.getDB().get(invalidKey));
+    stateStore.removeContainer(containerId);
+    assertNull(stateStore.getDB().get(invalidKey));
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to