Author: hitesh
Date: Wed Jan 23 00:57:37 2013
New Revision: 1437245

URL: http://svn.apache.org/viewvc?rev=1437245&view=rev
Log:
YARN-231. RM Restart - Add FS-based persistent store implementation for 
RMStateStore. Contributed by Bikas Saha

Added:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
   (with props)
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
   (with props)
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jan 23 00:57:37 2013
@@ -37,6 +37,9 @@ Release 2.0.3-alpha - Unreleased 
 
     YARN-328. Use token request messages defined in hadoop common. (suresh)
 
+    YARN-231. RM Restart - Add FS-based persistent store implementation for
+    RMStateStore (Bikas Saha via hitesh)
+
   IMPROVEMENTS
 
     YARN-223. Update process tree instead of getting new process trees.

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java
 Wed Jan 23 00:57:37 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.classification.
 public abstract class ApplicationAttemptId implements
     Comparable<ApplicationAttemptId> {
   
+  public static final String appAttemptIdStrPrefix = "appattempt_";
+
   /**
    * Get the <code>ApplicationId</code> of the 
<code>ApplicationAttempId</code>. 
    * @return <code>ApplicationId</code> of the <code>ApplicationAttempId</code>
@@ -111,11 +113,11 @@ public abstract class ApplicationAttempt
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder("appattempt_");
+    StringBuilder sb = new StringBuilder(appAttemptIdStrPrefix);
     sb.append(this.getApplicationId().getClusterTimestamp()).append("_");
     sb.append(ApplicationId.appIdFormat.get().format(
         this.getApplicationId().getId()));
     sb.append("_").append(attemptIdFormat.get().format(getAttemptId()));
     return sb.toString();
   }
-}
\ No newline at end of file
+}

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java
 Wed Jan 23 00:57:37 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.classification.
 @Stable
 public abstract class ApplicationId implements Comparable<ApplicationId> {
   
+  public static final String appIdStrPrefix = "application_";
+
   /**
    * Get the short integer identifier of the <code>ApplicationId</code>
    * which is unique for all applications started by a particular instance
@@ -88,7 +90,7 @@ public abstract class ApplicationId impl
 
   @Override
   public String toString() {
-    return "application_" + this.getClusterTimestamp() + "_"
+    return appIdStrPrefix + this.getClusterTimestamp() + "_"
         + appIdFormat.get().format(getId());
   }
 
@@ -119,4 +121,4 @@ public abstract class ApplicationId impl
       return false;
     return true;
   }
-}
\ No newline at end of file
+}

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 Wed Jan 23 00:57:37 2013
@@ -236,6 +236,10 @@ public class YarnConfiguration extends C
   /** The class to use as the persistent store.*/
   public static final String RM_STORE = RM_PREFIX + "store.class";
   
+  /** URI for FileSystemRMStateStore */
+  public static final String FS_RM_STATE_STORE_URI =
+                                           RM_PREFIX + "fs.rm-state-store.uri";
+
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 Wed Jan 23 00:57:37 2013
@@ -230,6 +230,17 @@
   <property>
     <description>The class to use as the persistent store.</description>
     <name>yarn.resourcemanager.store.class</name>
+    
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
+  </property>
+
+  <property>
+    <description>URI pointing to the location of the FileSystem path where
+    RM state will be stored. This must be supplied when using
+    
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.fs.rm-state-store.uri</name>
+    <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
+    <!--value>hdfs://localhost:9000/rmstore</value-->
   </property>
 
   <property>

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
 Wed Jan 23 00:57:37 2013
@@ -41,6 +41,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1437245&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 Wed Jan 23 00:57:37 2013
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+/**
+ * A simple class for storing RM state in any storage that implements a basic
+ * FileSystem interface. Does not use directories so that simple key-value
+ * stores can be used. The retry policy for the real filesystem client must be
+ * configured separately to enable retry of filesystem operations when needed.
+ */
+public class FileSystemRMStateStore extends RMStateStore {
+
+  public static final Log LOG = 
LogFactory.getLog(FileSystemRMStateStore.class);
+
+  private static final String ROOT_DIR_NAME = "FSRMStateRoot";
+
+
+  private FileSystem fs;
+
+  private Path fsRootDirPath;
+
+  @VisibleForTesting
+  Path fsWorkingPath;
+
+  public synchronized void initInternal(Configuration conf)
+      throws Exception{
+
+    fsWorkingPath = new 
Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
+    fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+
+    // create filesystem
+    fs = fsWorkingPath.getFileSystem(conf);
+    fs.mkdirs(fsRootDirPath);
+  }
+
+  @Override
+  protected synchronized void closeInternal() throws Exception {
+    fs.close();
+  }
+
+  @Override
+  public synchronized RMState loadState() throws Exception {
+    try {
+      RMState state = new RMState();
+      FileStatus[] childNodes = fs.listStatus(fsRootDirPath);
+      List<ApplicationAttemptState> attempts =
+                                      new ArrayList<ApplicationAttemptState>();
+      for(FileStatus childNodeStatus : childNodes) {
+        assert childNodeStatus.isFile();
+        String childNodeName = childNodeStatus.getPath().getName();
+        Path childNodePath = getNodePath(childNodeName);
+        byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+        if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
+          // application
+          LOG.info("Loading application from node: " + childNodeName);
+          ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
+          ApplicationStateDataPBImpl appStateData =
+              new ApplicationStateDataPBImpl(
+                                
ApplicationStateDataProto.parseFrom(childData));
+          ApplicationState appState = new ApplicationState(
+                               appStateData.getSubmitTime(),
+                               appStateData.getApplicationSubmissionContext());
+          // assert child node name is same as actual applicationId
+          assert appId.equals(appState.context.getApplicationId());
+          state.appState.put(appId, appState);
+        } else if(childNodeName.startsWith(
+                                ApplicationAttemptId.appAttemptIdStrPrefix)) {
+          // attempt
+          LOG.info("Loading application attempt from node: " + childNodeName);
+          ApplicationAttemptId attemptId =
+                          ConverterUtils.toApplicationAttemptId(childNodeName);
+          ApplicationAttemptStateDataPBImpl attemptStateData =
+              new ApplicationAttemptStateDataPBImpl(
+                  ApplicationAttemptStateDataProto.parseFrom(childData));
+          ApplicationAttemptState attemptState = new ApplicationAttemptState(
+                          attemptId, attemptStateData.getMasterContainer());
+          // assert child node name is same as application attempt id
+          assert attemptId.equals(attemptState.getAttemptId());
+          attempts.add(attemptState);
+        } else {
+          LOG.info("Unknown child node with name: " + childNodeName);
+        }
+      }
+
+      // go through all attempts and add them to their apps
+      for(ApplicationAttemptState attemptState : attempts) {
+        ApplicationId appId = attemptState.getAttemptId().getApplicationId();
+        ApplicationState appState = state.appState.get(appId);
+        if(appState != null) {
+          appState.attempts.put(attemptState.getAttemptId(), attemptState);
+        } else {
+          // the application node may have been removed when the application
+          // completed but the RM might have stopped before it could remove the
+          // application attempt nodes
+          LOG.info("Application node not found for attempt: "
+                    + attemptState.getAttemptId());
+          deleteFile(getNodePath(attemptState.getAttemptId().toString()));
+        }
+      }
+
+      return state;
+    } catch (Exception e) {
+      LOG.error("Failed to load state.", e);
+      throw e;
+    }
+  }
+
+  @Override
+  public synchronized void storeApplicationState(String appId,
+                                     ApplicationStateDataPBImpl appStateDataPB)
+                                     throws Exception {
+    Path nodeCreatePath = getNodePath(appId);
+
+    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
+    byte[] appStateData = appStateDataPB.getProto().toByteArray();
+    try {
+      // currently throw all exceptions. May need to respond differently for HA
+      // based on whether we have lost the right to write to FS
+      writeFile(nodeCreatePath, appStateData);
+    } catch (Exception e) {
+      LOG.info("Error storing info for app: " + appId, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public synchronized void storeApplicationAttemptState(String attemptId,
+                          ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+                          throws Exception {
+    Path nodeCreatePath = getNodePath(attemptId);
+    LOG.info("Storing info for attempt: " + attemptId
+             + " at: " + nodeCreatePath);
+    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
+    try {
+      // currently throw all exceptions. May need to respond differently for HA
+      // based on whether we have lost the right to write to FS
+      writeFile(nodeCreatePath, attemptStateData);
+    } catch (Exception e) {
+      LOG.info("Error storing info for attempt: " + attemptId, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public synchronized void removeApplicationState(ApplicationState appState)
+                                                            throws Exception {
+    String appId = appState.getAppId().toString();
+    Path nodeRemovePath = getNodePath(appId);
+    LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
+    deleteFile(nodeRemovePath);
+    for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+      removeApplicationAttemptState(attemptId.toString());
+    }
+  }
+
+  public synchronized void removeApplicationAttemptState(String attemptId)
+                                                            throws Exception {
+    Path nodeRemovePath = getNodePath(attemptId);
+    LOG.info("Removing info for attempt: " + attemptId
+             + " at: " + nodeRemovePath);
+    deleteFile(nodeRemovePath);
+  }
+
+  // FileSystem related code
+
+  private void deleteFile(Path deletePath) throws Exception {
+    if(!fs.delete(deletePath, true)) {
+      throw new Exception("Failed to delete " + deletePath);
+    }
+  }
+
+  private byte[] readFile(Path inputPath, long len) throws Exception {
+    FSDataInputStream fsIn = fs.open(inputPath);
+    // state data will not be that "long"
+    byte[] data = new byte[(int)len];
+    fsIn.readFully(data);
+    return data;
+  }
+
+  private void writeFile(Path outputPath, byte[] data) throws Exception {
+    FSDataOutputStream fsOut = fs.create(outputPath, false);
+    fsOut.write(data);
+    fsOut.flush();
+    fsOut.close();
+  }
+
+  @VisibleForTesting
+  Path getNodePath(String nodeName) {
+    return new Path(fsRootDirPath, nodeName);
+  }
+}

Propchange: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1437245&r1=1437244&r2=1437245&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
 Wed Jan 23 00:57:37 2013
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
 
+@Unstable
 public class NullRMStateStore extends RMStateStore {
 
   @Override
@@ -36,7 +38,7 @@ public class NullRMStateStore extends RM
 
   @Override
   public RMState loadState() throws Exception {
-    return null;
+    throw new UnsupportedOperationException("Cannot load state from null 
store");
   }
 
   @Override

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java?rev=1437245&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
 Wed Jan 23 00:57:37 2013
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class TestRMStateStore {
+
+  public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
+
+  class TestDispatcher implements Dispatcher, 
EventHandler<RMAppAttemptStoredEvent> {
+
+    ApplicationAttemptId attemptId;
+    Exception storedException;
+
+    boolean notified = false;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void register(Class<? extends Enum> eventType, EventHandler 
handler) {
+    }
+
+    @Override
+    public void handle(RMAppAttemptStoredEvent event) {
+      assertEquals(attemptId, event.getApplicationAttemptId());
+      assertEquals(storedException, event.getStoredException());
+      notified = true;
+      synchronized (this) {
+        notifyAll();
+      }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public EventHandler getEventHandler() {
+      return this;
+    }
+
+  }
+
+  interface RMStateStoreHelper {
+    RMStateStore getRMStateStore() throws Exception;
+    void addOrphanAttemptIfNeeded(RMStateStore testStore,
+                                  TestDispatcher dispatcher) throws Exception;
+    boolean isFinalStateValid() throws Exception;
+  }
+
+  @Test
+  public void testFSRMStateStore() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      TestFSRMStateStoreTester fsTester = new 
TestFSRMStateStoreTester(cluster);
+      testRMStateStore(fsTester);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  class TestFSRMStateStoreTester implements RMStateStoreHelper {
+    Path workingDirPathURI;
+    FileSystemRMStateStore store;
+    MiniDFSCluster cluster;
+
+    class TestFileSystemRMStore extends FileSystemRMStateStore {
+      TestFileSystemRMStore(Configuration conf) throws Exception {
+        init(conf);
+        assertTrue(workingDirPathURI.equals(fsWorkingPath));
+      }
+    }
+
+    public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
+      Path workingDirPath = new Path("/Test");
+      this.cluster = cluster;
+      FileSystem fs = cluster.getFileSystem();
+      fs.mkdirs(workingDirPath);
+      Path clusterURI = new Path(cluster.getURI());
+      workingDirPathURI = new Path(clusterURI, workingDirPath);
+      fs.close();
+    }
+
+    @Override
+    public RMStateStore getRMStateStore() throws Exception {
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, 
workingDirPathURI.toString());
+      this.store = new TestFileSystemRMStore(conf);
+      return store;
+    }
+
+    @Override
+    public void addOrphanAttemptIfNeeded(RMStateStore testStore,
+                                 TestDispatcher dispatcher) throws Exception {
+      ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
+                                      "appattempt_1352994193343_0003_000001");
+      storeAttempt(testStore, attemptId,
+          "container_1352994193343_0003_01_000001", dispatcher);
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      FileSystem fs = cluster.getFileSystem();
+      FileStatus[] files = fs.listStatus(workingDirPathURI);
+      if(files.length == 1) {
+        // only store root directory should exist
+        return true;
+      }
+      return false;
+    }
+  }
+
+  void waitNotify(TestDispatcher dispatcher) {
+    long startTime = System.currentTimeMillis();
+    while(!dispatcher.notified) {
+      synchronized (dispatcher) {
+        try {
+          dispatcher.wait(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      if(System.currentTimeMillis() - startTime > 1000*60) {
+        fail("Timed out attempt store notification");
+      }
+    }
+    dispatcher.notified = false;
+  }
+
+  void storeApp(RMStateStore store, ApplicationId appId, long time)
+                                                              throws Exception 
{
+    ApplicationSubmissionContext context = new 
ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appId);
+
+    RMApp mockApp = mock(RMApp.class);
+    when(mockApp.getApplicationId()).thenReturn(appId);
+    when(mockApp.getSubmitTime()).thenReturn(time);
+    when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
+    store.storeApplication(mockApp);
+  }
+
+  ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
+                           String containerIdStr, TestDispatcher dispatcher)
+                                                             throws Exception {
+
+    Container container = new ContainerPBImpl();
+    container.setId(ConverterUtils.toContainerId(containerIdStr));
+    RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
+    when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
+    when(mockAttempt.getMasterContainer()).thenReturn(container);
+    dispatcher.attemptId = attemptId;
+    dispatcher.storedException = null;
+    store.storeApplicationAttempt(mockAttempt);
+    waitNotify(dispatcher);
+    return container.getId();
+  }
+
+  void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
+    long submitTime = System.currentTimeMillis();
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setDispatcher(dispatcher);
+
+    ApplicationAttemptId attemptId1 = ConverterUtils
+        .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
+    ApplicationId appId1 = attemptId1.getApplicationId();
+    storeApp(store, appId1, submitTime);
+    ContainerId containerId1 = storeAttempt(store, attemptId1,
+                 "container_1352994193343_0001_01_000001", dispatcher);
+    String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
+    ApplicationAttemptId attemptId2 =
+                ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
+    ContainerId containerId2 = storeAttempt(store, attemptId2,
+                 "container_1352994193343_0001_02_000001", dispatcher);
+
+    ApplicationAttemptId attemptIdRemoved = ConverterUtils
+        .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+    ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+    storeApp(store, appIdRemoved, submitTime);
+    storeAttempt(store, attemptIdRemoved,
+        "container_1352994193343_0002_01_000001", dispatcher);
+
+    RMApp mockRemovedApp = mock(RMApp.class);
+    HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
+                              new HashMap<ApplicationAttemptId, 
RMAppAttempt>();
+    ApplicationSubmissionContext context = new 
ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appIdRemoved);
+    when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
+    when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
+    when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
+    RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
+    when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
+    attempts.put(attemptIdRemoved, mockRemovedAttempt);
+    store.removeApplication(mockRemovedApp);
+
+    // add orphan attempt file to simulate incomplete removal of app state
+    stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher);
+
+    // let things settle down
+    Thread.sleep(1000);
+    store.close();
+
+    // load state
+    store = stateStoreHelper.getRMStateStore();
+    RMState state = store.loadState();
+    Map<ApplicationId, ApplicationState> rmAppState = 
state.getApplicationState();
+
+    // removed app or orphan attempt is not loaded
+    assertEquals(1, rmAppState.size());
+
+    ApplicationState appState = rmAppState.get(appId1);
+    // app is loaded
+    assertNotNull(appState);
+    // app is loaded correctly
+    assertEquals(submitTime, appState.getSubmitTime());
+    // submission context is loaded correctly
+    assertEquals(appId1,
+                 
appState.getApplicationSubmissionContext().getApplicationId());
+    ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
+    // attempt1 is loaded correctly
+    assertNotNull(attemptState);
+    assertEquals(attemptId1, attemptState.getAttemptId());
+    // attempt1 container is loaded correctly
+    assertEquals(containerId1, attemptState.getMasterContainer().getId());
+    attemptState = appState.getAttempt(attemptId2);
+    // attempt2 is loaded correctly
+    assertNotNull(attemptState);
+    assertEquals(attemptId2, attemptState.getAttemptId());
+    // attempt2 container is loaded correctly
+    assertEquals(containerId2, attemptState.getMasterContainer().getId());
+
+    // assert store is in expected state after everything is cleaned
+    assertTrue(stateStoreHelper.isFinalStateValid());
+
+    store.close();
+  }
+
+}

Propchange: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to