Author: bikas Date: Wed Jul 17 20:19:49 2013 New Revision: 1504261 URL: http://svn.apache.org/r1504261 Log: YARN-922. Change FileSystemRMStateStore to use directories (Jian He via bikas)
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1504261&r1=1504260&r2=1504261&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jul 17 20:19:49 2013 @@ -500,6 +500,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-927. Change ContainerRequest to not have more than 1 container count and remove StoreContainerRequest (bikas) + YARN-922. Change FileSystemRMStateStore to use directories (Jian He via + bikas) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1504261&r1=1504260&r2=1504261&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Wed Jul 17 20:19:49 2013 @@ -111,70 +111,66 @@ public class FileSystemRMStateStore exte private void loadRMAppState(RMState rmState) throws Exception { try { - FileStatus[] childNodes = fs.listStatus(rmAppRoot); List<ApplicationAttemptState> attempts = - new ArrayList<ApplicationAttemptState>(); - for(FileStatus childNodeStatus : childNodes) { - assert childNodeStatus.isFile(); - String childNodeName = childNodeStatus.getPath().getName(); - Path childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); - if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ - // application - LOG.info("Loading application from node: " + childNodeName); - ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = - new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = new ApplicationState( - appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); - // assert child node name is same as actual applicationId - assert appId.equals(appState.context.getApplicationId()); - rmState.appState.put(appId, appState); - } else if(childNodeName.startsWith( - ApplicationAttemptId.appAttemptIdStrPrefix)) { - // attempt - LOG.info("Loading application attempt from node: " + childNodeName); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl( + new ArrayList<ApplicationAttemptState>(); + + for (FileStatus appDir : fs.listStatus(rmAppRoot)) { + for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + byte[] childData = + readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + // application + LOG.info("Loading application from node: " + childNodeName); + ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(childData)); + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser()); + // assert child node name is same as actual applicationId + assert appId.equals(appState.context.getApplicationId()); + rmState.appState.put(appId, appState); + } else if (childNodeName + .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + // attempt + LOG.info("Loading application attempt from node: " + childNodeName); + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(childNodeName); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if(attemptStateData.getAppAttemptTokens() != null){ - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); + Credentials credentials = null; + if (attemptStateData.getAppAttemptTokens() != null) { + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); + + // assert child node name is same as application attempt id + assert attemptId.equals(attemptState.getAttemptId()); + attempts.add(attemptState); + } else { + LOG.info("Unknown child node with name: " + childNodeName); } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); - - // assert child node name is same as application attempt id - assert attemptId.equals(attemptState.getAttemptId()); - attempts.add(attemptState); - } else { - LOG.info("Unknown child node with name: " + childNodeName); } } - // go through all attempts and add them to their apps - for(ApplicationAttemptState attemptState : attempts) { + // go through all attempts and add them to their apps, Ideally, each + // attempt node must have a corresponding app node, because remove + // directory operation remove both at the same time + for (ApplicationAttemptState attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); ApplicationState appState = rmState.appState.get(appId); - if(appState != null) { - appState.attempts.put(attemptState.getAttemptId(), attemptState); - } else { - // the application node may have been removed when the application - // completed but the RM might have stopped before it could remove the - // application attempt nodes - LOG.info("Application node not found for attempt: " - + attemptState.getAttemptId()); - deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString())); - } + assert appState != null; + appState.attempts.put(attemptState.getAttemptId(), attemptState); } } catch (Exception e) { LOG.error("Failed to load state.", e); @@ -188,6 +184,12 @@ public class FileSystemRMStateStore exte for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); + if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { + rmState.rmSecretManagerState.dtSequenceNumber = + Integer.parseInt(childNodeName.split("_")[1]); + continue; + } + Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); ByteArrayInputStream is = new ByteArrayInputStream(childData); @@ -202,10 +204,7 @@ public class FileSystemRMStateStore exte long renewDate = fsIn.readLong(); rmState.rmSecretManagerState.delegationTokenState.put(identifier, renewDate); - } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { - rmState.rmSecretManagerState.dtSequenceNumber = - Integer.parseInt(childNodeName.split("_")[1]); - }else { + } else { LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); } fsIn.close(); @@ -215,7 +214,9 @@ public class FileSystemRMStateStore exte @Override public synchronized void storeApplicationState(String appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - Path nodeCreatePath = getNodePath(rmAppRoot, appId); + Path appDirPath = getAppDir(rmAppRoot, appId); + fs.mkdirs(appDirPath); + Path nodeCreatePath = getNodePath(appDirPath, appId); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -232,7 +233,11 @@ public class FileSystemRMStateStore exte @Override public synchronized void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - Path nodeCreatePath = getNodePath(rmAppRoot, attemptId); + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(attemptId); + Path appDirPath = + getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); + Path nodeCreatePath = getNodePath(appDirPath, attemptId); LOG.info("Storing info for attempt: " + attemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -250,20 +255,9 @@ public class FileSystemRMStateStore exte public synchronized void removeApplicationState(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); - Path nodeRemovePath = getNodePath(rmAppRoot, appId); + Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); - for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { - removeApplicationAttemptState(attemptId.toString()); - } - } - - public synchronized void removeApplicationAttemptState(String attemptId) - throws Exception { - Path nodeRemovePath = getNodePath(rmAppRoot, attemptId); - LOG.info("Removing info for attempt: " + attemptId - + " at: " + nodeRemovePath); - deleteFile(nodeRemovePath); } @Override @@ -329,6 +323,10 @@ public class FileSystemRMStateStore exte deleteFile(nodeCreatePath); } + private Path getAppDir(Path root, String appId) { + return getNodePath(root, appId); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java?rev=1504261&r1=1504260&r2=1504261&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java Wed Jul 17 20:19:49 2013 @@ -105,8 +105,6 @@ public class TestRMStateStore { interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; - void addOrphanAttemptIfNeeded(RMStateStore testStore, - TestDispatcher dispatcher) throws Exception; boolean isFinalStateValid() throws Exception; } @@ -154,15 +152,6 @@ public class TestRMStateStore { } @Override - public void addOrphanAttemptIfNeeded(RMStateStore testStore, - TestDispatcher dispatcher) throws Exception { - ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( - "appattempt_1352994193343_0003_000001"); - storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", null, null, dispatcher); - } - - @Override public boolean isFinalStateValid() throws Exception { FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(workingDirPathURI); @@ -289,9 +278,6 @@ public class TestRMStateStore { attempts.put(attemptIdRemoved, mockRemovedAttempt); store.removeApplication(mockRemovedApp); - // add orphan attempt file to simulate incomplete removal of app state - stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher); - // let things settle down Thread.sleep(1000); store.close(); @@ -301,9 +287,6 @@ public class TestRMStateStore { RMState state = store.loadState(); Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState(); - // removed app or orphan attempt is not loaded - assertEquals(1, rmAppState.size()); - ApplicationState appState = rmAppState.get(appId1); // app is loaded assertNotNull(appState);