Repository: spark
Updated Branches:
  refs/heads/master a019e6efb -> aab99d31a


[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled

## What changes were proposed in this pull request?

>From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for 
>auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to 
>use this path install of NM local dir if NM recovery is enabled.

## How was this patch tested?

Unit test + local test.

Author: jerryshao <ss...@hortonworks.com>

Closes #12994 from jerryshao/SPARK-14963.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aab99d31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aab99d31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aab99d31

Branch: refs/heads/master
Commit: aab99d31a927adfa9216dd14e76493a187b6d6e7
Parents: a019e6e
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue May 10 10:28:36 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Tue May 10 10:28:36 2016 -0500

----------------------------------------------------------------------
 .../spark/network/yarn/YarnShuffleService.java  |  64 ++++++++++--
 .../network/yarn/YarnShuffleServiceSuite.scala  | 101 +++++++++++++++++--
 2 files changed, 143 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aab99d31/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 4bc3c1a..9807383 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -68,6 +68,8 @@ public class YarnShuffleService extends AuxiliaryService {
   private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
   private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
 
+  private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
+
   // An entity that manages the shuffle secret per application
   // This is used only if authentication is enabled
   private ShuffleSecretManager secretManager;
@@ -75,6 +77,12 @@ public class YarnShuffleService extends AuxiliaryService {
   // The actual server that serves shuffle files
   private TransportServer shuffleServer = null;
 
+  private Configuration _conf = null;
+
+  // The recovery path used to shuffle service recovery
+  @VisibleForTesting
+  Path _recoveryPath = null;
+
   // Handles registering executors and opening shuffle blocks
   @VisibleForTesting
   ExternalShuffleBlockHandler blockHandler;
@@ -112,6 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
    */
   @Override
   protected void serviceInit(Configuration conf) {
+    _conf = conf;
 
     // In case this NM was killed while there were running spark applications, 
we need to restore
     // lost state for the existing executors.  We look for an existing file in 
the NM's local dirs.
@@ -119,7 +128,7 @@ public class YarnShuffleService extends AuxiliaryService {
     // an application was stopped while the NM was down, we expect yarn to 
call stopApplication()
     // when it comes back
     registeredExecutorFile =
-      
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));
+      new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
 
     TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
     // If authentication is enabled, set up the shuffle server to use a
@@ -190,16 +199,6 @@ public class YarnShuffleService extends AuxiliaryService {
     logger.info("Stopping container {}", containerId);
   }
 
-  private File findRegisteredExecutorFile(String[] localDirs) {
-    for (String dir: localDirs) {
-      File f = new File(new Path(dir).toUri().getPath(), 
"registeredExecutors.ldb");
-      if (f.exists()) {
-        return f;
-      }
-    }
-    return new File(new Path(localDirs[0]).toUri().getPath(), 
"registeredExecutors.ldb");
-  }
-
   /**
    * Close the shuffle server to clean up any associated state.
    */
@@ -222,4 +221,47 @@ public class YarnShuffleService extends AuxiliaryService {
   public ByteBuffer getMetaData() {
     return ByteBuffer.allocate(0);
   }
+
+  /**
+   * Set the recovery path for shuffle service recovery when NM is restarted. 
The method will be
+   * overrode and called when Hadoop version is 2.5+ and NM recovery is 
enabled, otherwise we
+   * have to manually call this to set our own recovery path.
+   */
+  public void setRecoveryPath(Path recoveryPath) {
+    _recoveryPath = recoveryPath;
+  }
+
+  /**
+   * Get the recovery path, this will override the default one to get our own 
maintained
+   * recovery path.
+   */
+  protected Path getRecoveryPath() {
+    String[] localDirs = 
_conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
+    for (String dir : localDirs) {
+      File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
+      if (f.exists()) {
+        if (_recoveryPath == null) {
+          // If NM recovery is not enabled, we should specify the recovery 
path using NM local
+          // dirs, which is compatible with the old code.
+          _recoveryPath = new Path(dir);
+        } else {
+          // If NM recovery is enabled and the recovery file exists in old NM 
local dirs, which
+          // means old version of Spark already generated the recovery file, 
we should copy the
+          // old file in to a new recovery path for the compatibility.
+          if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), 
RECOVERY_FILE_NAME))) {
+            // Fail to move recovery file to new path
+            logger.error("Failed to move recovery file {} to the path {}",
+              RECOVERY_FILE_NAME, _recoveryPath.toString());
+          }
+        }
+        break;
+      }
+    }
+
+    if (_recoveryPath == null) {
+      _recoveryPath = new Path(localDirs[0]);
+    }
+
+    return _recoveryPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aab99d31/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 0e433f6..749e656 100644
--- 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -19,16 +19,20 @@ package org.apache.spark.network.yarn
 import java.io.{DataOutputStream, File, FileOutputStream}
 
 import scala.annotation.tailrec
+import scala.concurrent.duration._
 
-import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, 
ApplicationTerminationContext}
 import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.network.shuffle.ShuffleTestAccessor
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.util.Utils
 
 class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach {
   private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
@@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
     
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
       classOf[YarnShuffleService].getCanonicalName)
     yarnConfig.setInt("spark.shuffle.service.port", 0)
-
-    yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir =>
-      val d = new File(dir)
-      if (d.exists()) {
-        FileUtils.deleteDirectory(d)
-      }
-      FileUtils.forceMkdir(d)
-      logInfo(s"creating yarn.nodemanager.local-dirs: $d")
-    }
+    val localDir = Utils.createTempDir()
+    yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
   }
 
   var s1: YarnShuffleService = null
@@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
     s3.initializeApplication(app2Data)
     ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be 
(Some(shuffleInfo2))
     s3.stop()
+  }
+
+  test("get correct recovery path") {
+    // Test recovery path is set outside the shuffle service, this is to 
simulate NM recovery
+    // enabled scenario, where recovery path will be set by yarn.
+    s1 = new YarnShuffleService
+    val recoveryPath = new Path(Utils.createTempDir().toURI)
+    s1.setRecoveryPath(recoveryPath)
+
+    s1.init(yarnConfig)
+    s1._recoveryPath should be (recoveryPath)
+    s1.stop()
 
+    // Test recovery path is set inside the shuffle service, this will be 
happened when NM
+    // recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
+    s2 = new YarnShuffleService
+    s2.init(yarnConfig)
+    s2._recoveryPath should be
+      (new 
Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
+    s2.stop()
   }
 
-}
+  test("moving recovery file form NM local dir to recovery path") {
+    // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is 
enabled, we should move
+    // old recovery file to the new path to keep compatibility
+
+    // Simulate s1 is running on old version of Hadoop in which recovery file 
is in the NM local
+    // dir.
+    s1 = new YarnShuffleService
+    s1.init(yarnConfig)
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data: ApplicationInitializationContext =
+      new ApplicationInitializationContext("user", app1Id, null)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Data: ApplicationInitializationContext =
+      new ApplicationInitializationContext("user", app2Id, null)
+    s1.initializeApplication(app2Data)
+
+    val execStateFile = s1.registeredExecutorFile
+    execStateFile should not be (null)
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
+
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
+
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+      be (Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
+      be (Some(shuffleInfo2))
+
+    assert(execStateFile.exists(), s"$execStateFile did not exist")
+
+    s1.stop()
+
+    // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
+    assert(execStateFile.exists())
+    val recoveryPath = new Path(Utils.createTempDir().toURI)
+    s2 = new YarnShuffleService
+    s2.setRecoveryPath(recoveryPath)
+    s2.init(yarnConfig)
+
+    val execStateFile2 = s2.registeredExecutorFile
+    recoveryPath.toString should be (new 
Path(execStateFile2.getParentFile.toURI).toString)
+    eventually(timeout(10 seconds), interval(5 millis)) {
+      assert(!execStateFile.exists())
+    }
+
+    val handler2 = s2.blockHandler
+    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+
+    // now we reinitialize only one of the apps, and expect yarn to tell us 
that app2 was stopped
+    // during the restart
+    // Since recovery file is got from old path, so the previous state should 
be stored.
+    s2.initializeApplication(app1Data)
+    s2.stopApplication(new ApplicationTerminationContext(app2Id))
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be 
(Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(None)
+
+    s2.stop()
+  }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to