Author: bikas
Date: Wed Jul 17 20:19:49 2013
New Revision: 1504261

URL: http://svn.apache.org/r1504261
Log:
YARN-922. Change FileSystemRMStateStore to use directories (Jian He via bikas)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1504261&r1=1504260&r2=1504261&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jul 17 20:19:49 2013
@@ -500,6 +500,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-927. Change ContainerRequest to not have more than 1 container count
     and remove StoreContainerRequest (bikas)
 
+    YARN-922. Change FileSystemRMStateStore to use directories (Jian He via
+    bikas)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1504261&r1=1504260&r2=1504261&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 Wed Jul 17 20:19:49 2013
@@ -111,70 +111,66 @@ public class FileSystemRMStateStore exte
 
   private void loadRMAppState(RMState rmState) throws Exception {
     try {
-      FileStatus[] childNodes = fs.listStatus(rmAppRoot);
       List<ApplicationAttemptState> attempts =
-                                      new ArrayList<ApplicationAttemptState>();
-      for(FileStatus childNodeStatus : childNodes) {
-        assert childNodeStatus.isFile();
-        String childNodeName = childNodeStatus.getPath().getName();
-        Path childNodePath = getNodePath(rmAppRoot, childNodeName);
-        byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
-        if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
-          // application
-          LOG.info("Loading application from node: " + childNodeName);
-          ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
-          ApplicationStateDataPBImpl appStateData =
-              new ApplicationStateDataPBImpl(
-                                
ApplicationStateDataProto.parseFrom(childData));
-          ApplicationState appState = new ApplicationState(
-                               appStateData.getSubmitTime(),
-                               appStateData.getApplicationSubmissionContext(),
-                               appStateData.getUser());
-          // assert child node name is same as actual applicationId
-          assert appId.equals(appState.context.getApplicationId());
-          rmState.appState.put(appId, appState);
-        } else if(childNodeName.startsWith(
-                                ApplicationAttemptId.appAttemptIdStrPrefix)) {
-          // attempt
-          LOG.info("Loading application attempt from node: " + childNodeName);
-          ApplicationAttemptId attemptId =
-                          ConverterUtils.toApplicationAttemptId(childNodeName);
-          ApplicationAttemptStateDataPBImpl attemptStateData =
-              new ApplicationAttemptStateDataPBImpl(
+          new ArrayList<ApplicationAttemptState>();
+
+      for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
+        for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
+          assert childNodeStatus.isFile();
+          String childNodeName = childNodeStatus.getPath().getName();
+          byte[] childData =
+              readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
+          if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+            // application
+            LOG.info("Loading application from node: " + childNodeName);
+            ApplicationId appId = 
ConverterUtils.toApplicationId(childNodeName);
+            ApplicationStateDataPBImpl appStateData =
+                new ApplicationStateDataPBImpl(
+                  ApplicationStateDataProto.parseFrom(childData));
+            ApplicationState appState =
+                new ApplicationState(appStateData.getSubmitTime(),
+                  appStateData.getApplicationSubmissionContext(),
+                  appStateData.getUser());
+            // assert child node name is same as actual applicationId
+            assert appId.equals(appState.context.getApplicationId());
+            rmState.appState.put(appId, appState);
+          } else if (childNodeName
+            .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+            // attempt
+            LOG.info("Loading application attempt from node: " + 
childNodeName);
+            ApplicationAttemptId attemptId =
+                ConverterUtils.toApplicationAttemptId(childNodeName);
+            ApplicationAttemptStateDataPBImpl attemptStateData =
+                new ApplicationAttemptStateDataPBImpl(
                   ApplicationAttemptStateDataProto.parseFrom(childData));
-          Credentials credentials = null;
-          if(attemptStateData.getAppAttemptTokens() != null){
-            credentials = new Credentials();
-            DataInputByteBuffer dibb = new DataInputByteBuffer();
-            dibb.reset(attemptStateData.getAppAttemptTokens());
-            credentials.readTokenStorageStream(dibb);
+            Credentials credentials = null;
+            if (attemptStateData.getAppAttemptTokens() != null) {
+              credentials = new Credentials();
+              DataInputByteBuffer dibb = new DataInputByteBuffer();
+              dibb.reset(attemptStateData.getAppAttemptTokens());
+              credentials.readTokenStorageStream(dibb);
+            }
+            ApplicationAttemptState attemptState =
+                new ApplicationAttemptState(attemptId,
+                  attemptStateData.getMasterContainer(), credentials);
+
+            // assert child node name is same as application attempt id
+            assert attemptId.equals(attemptState.getAttemptId());
+            attempts.add(attemptState);
+          } else {
+            LOG.info("Unknown child node with name: " + childNodeName);
           }
-          ApplicationAttemptState attemptState =
-              new ApplicationAttemptState(attemptId,
-                attemptStateData.getMasterContainer(), credentials);
-
-          // assert child node name is same as application attempt id
-          assert attemptId.equals(attemptState.getAttemptId());
-          attempts.add(attemptState);
-        } else {
-          LOG.info("Unknown child node with name: " + childNodeName);
         }
       }
 
-      // go through all attempts and add them to their apps
-      for(ApplicationAttemptState attemptState : attempts) {
+      // go through all attempts and add them to their apps, Ideally, each
+      // attempt node must have a corresponding app node, because remove
+      // directory operation remove both at the same time
+      for (ApplicationAttemptState attemptState : attempts) {
         ApplicationId appId = attemptState.getAttemptId().getApplicationId();
         ApplicationState appState = rmState.appState.get(appId);
-        if(appState != null) {
-          appState.attempts.put(attemptState.getAttemptId(), attemptState);
-        } else {
-          // the application node may have been removed when the application
-          // completed but the RM might have stopped before it could remove the
-          // application attempt nodes
-          LOG.info("Application node not found for attempt: "
-                    + attemptState.getAttemptId());
-          deleteFile(getNodePath(rmAppRoot, 
attemptState.getAttemptId().toString()));
-        }
+        assert appState != null;
+        appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     } catch (Exception e) {
       LOG.error("Failed to load state.", e);
@@ -188,6 +184,12 @@ public class FileSystemRMStateStore exte
     for(FileStatus childNodeStatus : childNodes) {
       assert childNodeStatus.isFile();
       String childNodeName = childNodeStatus.getPath().getName();
+      if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+        rmState.rmSecretManagerState.dtSequenceNumber =
+            Integer.parseInt(childNodeName.split("_")[1]);
+        continue;
+      }
+
       Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
       byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
@@ -202,10 +204,7 @@ public class FileSystemRMStateStore exte
         long renewDate = fsIn.readLong();
         rmState.rmSecretManagerState.delegationTokenState.put(identifier,
           renewDate);
-      } else 
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
-        rmState.rmSecretManagerState.dtSequenceNumber =
-            Integer.parseInt(childNodeName.split("_")[1]);
-      }else {
+      } else {
         LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
       }
       fsIn.close();
@@ -215,7 +214,9 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationState(String appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    Path nodeCreatePath = getNodePath(rmAppRoot, appId);
+    Path appDirPath = getAppDir(rmAppRoot, appId);
+    fs.mkdirs(appDirPath);
+    Path nodeCreatePath = getNodePath(appDirPath, appId);
 
     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -232,7 +233,11 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationAttemptState(String attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
-    Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    ApplicationAttemptId appAttemptId =
+        ConverterUtils.toApplicationAttemptId(attemptId);
+    Path appDirPath =
+        getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+    Path nodeCreatePath = getNodePath(appDirPath, attemptId);
     LOG.info("Storing info for attempt: " + attemptId
              + " at: " + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -250,20 +255,9 @@ public class FileSystemRMStateStore exte
   public synchronized void removeApplicationState(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
-    Path nodeRemovePath = getNodePath(rmAppRoot, appId);
+    Path nodeRemovePath = getAppDir(rmAppRoot, appId);
     LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
     deleteFile(nodeRemovePath);
-    for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      removeApplicationAttemptState(attemptId.toString());
-    }
-  }
-
-  public synchronized void removeApplicationAttemptState(String attemptId)
-      throws Exception {
-    Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
-    LOG.info("Removing info for attempt: " + attemptId
-             + " at: " + nodeRemovePath);
-    deleteFile(nodeRemovePath);
   }
 
   @Override
@@ -329,6 +323,10 @@ public class FileSystemRMStateStore exte
     deleteFile(nodeCreatePath);
   }
 
+  private Path getAppDir(Path root, String appId) {
+    return getNodePath(root, appId);
+  }
+
   // FileSystem related code
 
   private void deleteFile(Path deletePath) throws Exception {

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java?rev=1504261&r1=1504260&r2=1504261&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
 Wed Jul 17 20:19:49 2013
@@ -105,8 +105,6 @@ public class TestRMStateStore {
 
   interface RMStateStoreHelper {
     RMStateStore getRMStateStore() throws Exception;
-    void addOrphanAttemptIfNeeded(RMStateStore testStore,
-                                  TestDispatcher dispatcher) throws Exception;
     boolean isFinalStateValid() throws Exception;
   }
 
@@ -154,15 +152,6 @@ public class TestRMStateStore {
     }
 
     @Override
-    public void addOrphanAttemptIfNeeded(RMStateStore testStore,
-                                 TestDispatcher dispatcher) throws Exception {
-      ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
-                                      "appattempt_1352994193343_0003_000001");
-      storeAttempt(testStore, attemptId,
-          "container_1352994193343_0003_01_000001", null, null, dispatcher);
-    }
-
-    @Override
     public boolean isFinalStateValid() throws Exception {
       FileSystem fs = cluster.getFileSystem();
       FileStatus[] files = fs.listStatus(workingDirPathURI);
@@ -289,9 +278,6 @@ public class TestRMStateStore {
     attempts.put(attemptIdRemoved, mockRemovedAttempt);
     store.removeApplication(mockRemovedApp);
 
-    // add orphan attempt file to simulate incomplete removal of app state
-    stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher);
-
     // let things settle down
     Thread.sleep(1000);
     store.close();
@@ -301,9 +287,6 @@ public class TestRMStateStore {
     RMState state = store.loadState();
     Map<ApplicationId, ApplicationState> rmAppState = 
state.getApplicationState();
 
-    // removed app or orphan attempt is not loaded
-    assertEquals(1, rmAppState.size());
-
     ApplicationState appState = rmAppState.get(appId1);
     // app is loaded
     assertNotNull(appState);


Reply via email to