Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 b4d24d7b0 -> a99688931


YARN-4924. NM recovery race can lead to container not cleaned up. Contributed 
by sandflee
(cherry picked from commit 9b5c5bd42f0cb240d0fe7754967765a99dd5be46)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a9968893
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a9968893
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a9968893

Branch: refs/heads/branch-2.7
Commit: a9968893130cac40f0e7666a409f95194544d53e
Parents: b4d24d7
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Apr 14 19:44:31 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Apr 14 19:44:31 2016 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../containermanager/ContainerManagerImpl.java  | 11 ---
 .../recovery/NMLeveldbStateStoreService.java    | 76 +++++++++++++-------
 .../recovery/NMNullStateStoreService.java       |  4 --
 .../recovery/NMStateStoreService.java           | 12 ----
 .../TestContainerManagerRecovery.java           |  4 ++
 .../recovery/NMMemoryStateStoreService.java     | 10 ---
 .../TestNMLeveldbStateStoreService.java         | 10 +--
 8 files changed, 57 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 36d049d..6e51b26 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -127,6 +127,9 @@ Release 2.7.3 - UNRELEASED
     YARN-4938. MiniYarnCluster should not request transitionToActive to RM
     on non-HA environment. (Eric Badger via aajisaka)
 
+    YARN-4924. NM recovery race can lead to container not cleaned up.
+    (sandflee via jlowe)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 5465e54..0c8ed05 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -262,12 +262,6 @@ public class ContainerManagerImpl extends CompositeService 
implements
       for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
         recoverContainer(rcs);
       }
-
-      String diagnostic = "Application marked finished during recovery";
-      for (ApplicationId appId : appsState.getFinishedApplications()) {
-        dispatcher.getEventHandler().handle(
-            new ApplicationFinishEvent(appId, diagnostic));
-      }
     }
   }
 
@@ -1109,11 +1103,6 @@ public class ContainerManagerImpl extends 
CompositeService implements
         } else if (appsFinishedEvent.getReason() == 
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
           diagnostic = "Application killed by ResourceManager";
         }
-        try {
-          this.context.getNMStateStore().storeFinishedApplication(appID);
-        } catch (IOException e) {
-          LOG.error("Unable to update application state in store", e);
-        }
         this.dispatcher.getEventHandler().handle(
             new ApplicationFinishEvent(appID,
                 diagnostic));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index df58182..0c9901c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -81,6 +81,7 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
 
   private static final String APPLICATIONS_KEY_PREFIX =
       "ContainerManager/applications/";
+  @Deprecated
   private static final String FINISHED_APPS_KEY_PREFIX =
       "ContainerManager/finishedApps/";
 
@@ -339,20 +340,6 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         state.applications.add(
             ContainerManagerApplicationProto.parseFrom(entry.getValue()));
       }
-
-      state.finishedApplications = new ArrayList<ApplicationId>();
-      keyPrefix = FINISHED_APPS_KEY_PREFIX;
-      iter.seek(bytes(keyPrefix));
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> entry = iter.next();
-        String key = asString(entry.getKey());
-        if (!key.startsWith(keyPrefix)) {
-          break;
-        }
-        ApplicationId appId =
-            ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
-        state.finishedApplications.add(appId);
-      }
     } catch (DBException e) {
       throw new IOException(e);
     } finally {
@@ -361,6 +348,8 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
       }
     }
 
+    cleanupDeprecatedFinishedApps();
+
     return state;
   }
 
@@ -376,17 +365,6 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
-  public void storeFinishedApplication(ApplicationId appId)
-      throws IOException {
-    String key = FINISHED_APPS_KEY_PREFIX + appId;
-    try {
-      db.put(bytes(key), new byte[0]);
-    } catch (DBException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
   public void removeApplication(ApplicationId appId)
       throws IOException {
     try {
@@ -394,8 +372,6 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
       try {
         String key = APPLICATIONS_KEY_PREFIX + appId;
         batch.delete(bytes(key));
-        key = FINISHED_APPS_KEY_PREFIX + appId;
-        batch.delete(bytes(key));
         db.write(batch);
       } finally {
         batch.close();
@@ -913,6 +889,52 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
     }
   }
 
+  @SuppressWarnings("deprecation")
+  private void cleanupDeprecatedFinishedApps() {
+    try {
+      cleanupKeysWithPrefix(FINISHED_APPS_KEY_PREFIX);
+    } catch (Exception e) {
+      LOG.warn("cleanup keys with prefix " + FINISHED_APPS_KEY_PREFIX +
+              " from leveldb failed", e);
+    }
+  }
+
+  private void cleanupKeysWithPrefix(String prefix) throws IOException {
+    WriteBatch batch = null;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      try {
+        batch = db.createWriteBatch();
+        iter.seek(bytes(prefix));
+        while (iter.hasNext()) {
+          byte[] key = iter.next().getKey();
+          String keyStr = asString(key);
+          if (!keyStr.startsWith(prefix)) {
+            break;
+          }
+          batch.delete(key);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("cleanup " + keyStr + " from leveldb");
+          }
+        }
+        db.write(batch);
+      } catch (DBException e) {
+        throw new IOException(e);
+      } finally {
+        if (batch != null) {
+          batch.close();
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+  }
+
   private String getLogDeleterKey(ApplicationId appId) {
     return LOG_DELETER_KEY_PREFIX + appId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index ab49543..ada1944 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -58,10 +58,6 @@ public class NMNullStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
-  public void storeFinishedApplication(ApplicationId appId) {
-  }
-
-  @Override
   public void removeApplication(ApplicationId appId) throws IOException {
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index fa66349..c24684a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -51,15 +51,11 @@ public abstract class NMStateStoreService extends 
AbstractService {
 
   public static class RecoveredApplicationsState {
     List<ContainerManagerApplicationProto> applications;
-    List<ApplicationId> finishedApplications;
 
     public List<ContainerManagerApplicationProto> getApplications() {
       return applications;
     }
 
-    public List<ApplicationId> getFinishedApplications() {
-      return finishedApplications;
-    }
   }
 
   public enum RecoveredContainerStatus {
@@ -242,14 +238,6 @@ public abstract class NMStateStoreService extends 
AbstractService {
       ContainerManagerApplicationProto p) throws IOException;
 
   /**
-   * Record that an application has finished
-   * @param appId the application ID
-   * @throws IOException
-   */
-  public abstract void storeFinishedApplication(ApplicationId appId)
-      throws IOException;
-
-  /**
    * Remove records corresponding to an application
    * @param appId the application ID
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index c45ffbb..93a434c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -210,6 +210,10 @@ public class TestContainerManagerRecovery {
     assertEquals(1, context.getApplications().size());
     app = context.getApplications().get(appId);
     assertNotNull(app);
+    // no longer saving FINISH_APP event in NM stateStore,
+    // simulate by resending FINISH_APP event
+    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
+        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
     waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
     assertTrue(context.getApplicationACLsManager().checkAccess(
         UserGroupInformation.createRemoteUser(modUser),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index e0487e7..9be53df 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -43,7 +43,6 @@ import 
org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<ApplicationId, ContainerManagerApplicationProto> apps;
-  private Set<ApplicationId> finishedApps;
   private Map<ContainerId, RecoveredContainerState> containerStates;
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
@@ -58,7 +57,6 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
   @Override
   protected void initStorage(Configuration conf) {
     apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
-    finishedApps = new HashSet<ApplicationId>();
     containerStates = new HashMap<ContainerId, RecoveredContainerState>();
     nmTokenState = new RecoveredNMTokensState();
     nmTokenState.applicationMasterKeys =
@@ -85,7 +83,6 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
     RecoveredApplicationsState state = new RecoveredApplicationsState();
     state.applications = new ArrayList<ContainerManagerApplicationProto>(
         apps.values());
-    state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
     return state;
   }
 
@@ -98,15 +95,9 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
   }
 
   @Override
-  public synchronized void storeFinishedApplication(ApplicationId appId) {
-    finishedApps.add(appId);
-  }
-
-  @Override
   public synchronized void removeApplication(ApplicationId appId)
       throws IOException {
     apps.remove(appId);
-    finishedApps.remove(appId);
   }
 
   @Override
@@ -384,7 +375,6 @@ public class NMMemoryStateStoreService extends 
NMStateStoreService {
     logDeleterState.remove(appId);
   }
 
-
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9968893/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 1804424..41ec2d5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -174,7 +174,6 @@ public class TestNMLeveldbStateStoreService {
     // test empty when no state
     RecoveredApplicationsState state = stateStore.loadApplicationsState();
     assertTrue(state.getApplications().isEmpty());
-    assertTrue(state.getFinishedApplications().isEmpty());
 
     // store an application and verify recovered
     final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
@@ -188,10 +187,8 @@ public class TestNMLeveldbStateStoreService {
     state = stateStore.loadApplicationsState();
     assertEquals(1, state.getApplications().size());
     assertEquals(appProto1, state.getApplications().get(0));
-    assertTrue(state.getFinishedApplications().isEmpty());
 
-    // finish an application and add a new one
-    stateStore.storeFinishedApplication(appId1);
+    // add a new app
     final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
     builder = ContainerManagerApplicationProto.newBuilder();
     builder.setId(((ApplicationIdPBImpl) appId2).getProto());
@@ -203,18 +200,13 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(2, state.getApplications().size());
     assertTrue(state.getApplications().contains(appProto1));
     assertTrue(state.getApplications().contains(appProto2));
-    assertEquals(1, state.getFinishedApplications().size());
-    assertEquals(appId1, state.getFinishedApplications().get(0));
 
     // test removing an application
-    stateStore.storeFinishedApplication(appId2);
     stateStore.removeApplication(appId2);
     restartStateStore();
     state = stateStore.loadApplicationsState();
     assertEquals(1, state.getApplications().size());
     assertEquals(appProto1, state.getApplications().get(0));
-    assertEquals(1, state.getFinishedApplications().size());
-    assertEquals(appId1, state.getFinishedApplications().get(0));
   }
 
   @Test

Reply via email to