Repository: spark
Updated Branches:
  refs/heads/master 7098a1294 -> a3981c28c


[SPARK-17433] YarnShuffleService doesn't handle moving credentials levelDb

The secrets leveldb isn't being moved if you run spark shuffle services without 
yarn nm recovery on and then turn it on.  This fixes that.  I unfortunately 
missed this when I ported the patch from our internal branch 2 to master branch 
due to the changes for the recovery path.  Note this only applies to master 
since it is the only place the yarn nm recovery dir is used.

Unit tests ran and tested on 8 node cluster.  Fresh startup with NM recovery, 
fresh startup no nm recovery, switching between no nm recovery and recovery.  
Also tested running applications to make sure wasn't affected by rolling 
upgrade.

Author: Thomas Graves <tgra...@prevailsail.corp.gq1.yahoo.com>
Author: Tom Graves <tgra...@apache.org>

Closes #14999 from tgravescs/SPARK-17433.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3981c28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3981c28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3981c28

Branch: refs/heads/master
Commit: a3981c28c956a82ccf5b1c61d45b6bd252d4abed
Parents: 7098a12
Author: Thomas Graves <tgra...@prevailsail.corp.gq1.yahoo.com>
Authored: Fri Sep 9 13:43:32 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Fri Sep 9 13:43:32 2016 -0500

----------------------------------------------------------------------
 .../spark/network/yarn/YarnShuffleService.java  | 56 ++++++++++++++------
 .../network/yarn/YarnShuffleServiceSuite.scala  | 12 ++++-
 2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3981c28/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index df082e4..43c8df7 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
 
@@ -159,8 +160,7 @@ public class YarnShuffleService extends AuxiliaryService {
       // If we don't find one, then we choose a file to use to save the state 
next time.  Even if
       // an application was stopped while the NM was down, we expect yarn to 
call stopApplication()
       // when it comes back
-      registeredExecutorFile =
-        new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
+      registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
 
       TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
       blockHandler = new ExternalShuffleBlockHandler(transportConf, 
registeredExecutorFile);
@@ -196,7 +196,7 @@ public class YarnShuffleService extends AuxiliaryService {
 
   private void createSecretManager() throws IOException {
     secretManager = new ShuffleSecretManager();
-    secretsFile = new File(getRecoveryPath().toUri().getPath(), 
SECRETS_RECOVERY_FILE_NAME);
+    secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
  
     // Make sure this is protected in case its not in the NM recovery dir
     FileSystem fs = FileSystem.getLocal(_conf);
@@ -328,37 +328,59 @@ public class YarnShuffleService extends AuxiliaryService {
   }
 
   /**
-   * Get the recovery path, this will override the default one to get our own 
maintained
-   * recovery path.
+   * Get the path specific to this auxiliary service to use for recovery.
+   */ 
+  protected Path getRecoveryPath(String fileName) {
+    return _recoveryPath;
+  }
+
+  /**
+   * Figure out the recovery path and handle moving the DB if YARN NM recovery 
gets enabled
+   * when it previously was not. If YARN NM recovery is enabled it uses that 
path, otherwise
+   * it will uses a YARN local dir.
    */
-  protected Path getRecoveryPath() {
+  protected File initRecoveryDb(String dbFileName) {
+    if (_recoveryPath != null) {
+        File recoveryFile = new File(_recoveryPath.toUri().getPath(), 
dbFileName);
+        if (recoveryFile.exists()) {
+          return recoveryFile;
+        }
+    } 
+    // db doesn't exist in recovery path go check local dirs for it
     String[] localDirs = 
_conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
     for (String dir : localDirs) {
-      File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
+      File f = new File(new Path(dir).toUri().getPath(), dbFileName);
       if (f.exists()) {
         if (_recoveryPath == null) {
           // If NM recovery is not enabled, we should specify the recovery 
path using NM local
           // dirs, which is compatible with the old code.
           _recoveryPath = new Path(dir);
+          return f;
         } else {
-          // If NM recovery is enabled and the recovery file exists in old NM 
local dirs, which
-          // means old version of Spark already generated the recovery file, 
we should copy the
-          // old file in to a new recovery path for the compatibility.
-          if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), 
RECOVERY_FILE_NAME))) {
-            // Fail to move recovery file to new path
-            logger.error("Failed to move recovery file {} to the path {}",
-              RECOVERY_FILE_NAME, _recoveryPath.toString());
+          // If the recovery path is set then either NM recovery is enabled or 
another recovery
+          // DB has been initialized. If NM recovery is enabled and had set 
the recovery path
+          // make sure to move all DBs to the recovery path from the old NM 
local dirs.
+          // If another DB was initialized first just make sure all the DBs 
are in the same
+          // location.
+          File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
+          if (!newLoc.equals(f)) {
+            try {
+              Files.move(f.toPath(), newLoc.toPath());
+            } catch (Exception e) {
+              // Fail to move recovery file to new path, just continue on with 
new DB location
+              logger.error("Failed to move recovery file {} to the path {}",
+                dbFileName, _recoveryPath.toString(), e);
+            }
           }
+          return newLoc;
         }
-        break;
       }
     }
-
     if (_recoveryPath == null) {
       _recoveryPath = new Path(localDirs[0]);
     }
 
-    return _recoveryPath;
+    return new File(_recoveryPath.toUri().getPath(), dbFileName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a3981c28/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 9a07186..c86bf7f 100644
--- 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
     s2.stop()
   }
 
-  test("moving recovery file form NM local dir to recovery path") {
+  test("moving recovery file from NM local dir to recovery path") {
     // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is 
enabled, we should move
     // old recovery file to the new path to keep compatibility
 
     // Simulate s1 is running on old version of Hadoop in which recovery file 
is in the NM local
     // dir.
     s1 = new YarnShuffleService
+    // set auth to true to test the secrets recovery
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
     s1.init(yarnConfig)
     val app1Id = ApplicationId.newInstance(0, 1)
     val app1Data: ApplicationInitializationContext =
@@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
 
     val execStateFile = s1.registeredExecutorFile
     execStateFile should not be (null)
+    val secretsFile = s1.secretsFile
+    secretsFile should not be (null)
     val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
     val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
 
@@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
     s2.init(yarnConfig)
 
     val execStateFile2 = s2.registeredExecutorFile
+    val secretsFile2 = s2.secretsFile
+
     recoveryPath.toString should be (new 
Path(execStateFile2.getParentFile.toURI).toString)
+    recoveryPath.toString should be (new 
Path(secretsFile2.getParentFile.toURI).toString)
     eventually(timeout(10 seconds), interval(5 millis)) {
       assert(!execStateFile.exists())
     }
+    eventually(timeout(10 seconds), interval(5 millis)) {
+      assert(!secretsFile.exists())
+    }
 
     val handler2 = s2.blockHandler
     val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to