Repository: spark
Updated Branches:
  refs/heads/master c4bc2ed84 -> b7b5e1787


[SPARK-16505][YARN] Optionally propagate error during shuffle service startup.

This prevents the NM from starting when something is wrong, which would
lead to later errors which are confusing and harder to debug.

Added a unit test to verify startup fails if something is wrong.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #14162 from vanzin/SPARK-16505.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7b5e178
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7b5e178
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7b5e178

Branch: refs/heads/master
Commit: b7b5e17876f65c6644505c356f1a0db24ce1d142
Parents: c4bc2ed
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Jul 14 09:42:32 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Thu Jul 14 09:42:32 2016 -0500

----------------------------------------------------------------------
 .../spark/network/yarn/YarnShuffleService.java  | 75 +++++++++++---------
 docs/job-scheduling.md                          | 13 +---
 docs/running-on-yarn.md                         | 31 ++++++++
 .../network/yarn/YarnShuffleServiceSuite.scala  | 34 ++++++++-
 4 files changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7b5e178/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 8a05628..df17dac 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService {
 
   private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
 
+  // Whether failure during service initialization should stop the NM.
+  @VisibleForTesting
+  static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
+  private static final boolean DEFAULT_STOP_ON_FAILURE = false;
+
   // An entity that manages the shuffle secret per application
   // This is used only if authentication is enabled
   private ShuffleSecretManager secretManager;
@@ -119,44 +124,50 @@ public class YarnShuffleService extends AuxiliaryService {
    * Start the shuffle server with the given configuration.
    */
   @Override
-  protected void serviceInit(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     _conf = conf;
 
-    // In case this NM was killed while there were running spark applications, 
we need to restore
-    // lost state for the existing executors.  We look for an existing file in 
the NM's local dirs.
-    // If we don't find one, then we choose a file to use to save the state 
next time.  Even if
-    // an application was stopped while the NM was down, we expect yarn to 
call stopApplication()
-    // when it comes back
-    registeredExecutorFile =
-      new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
-
-    TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
-    // If authentication is enabled, set up the shuffle server to use a
-    // special RPC handler that filters out unauthenticated fetch requests
-    boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
+    boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, 
DEFAULT_STOP_ON_FAILURE);
+
     try {
+      // In case this NM was killed while there were running spark 
applications, we need to restore
+      // lost state for the existing executors.  We look for an existing file 
in the NM's local dirs.
+      // If we don't find one, then we choose a file to use to save the state 
next time.  Even if
+      // an application was stopped while the NM was down, we expect yarn to 
call stopApplication()
+      // when it comes back
+      registeredExecutorFile =
+        new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
+
+      TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
       blockHandler = new ExternalShuffleBlockHandler(transportConf, 
registeredExecutorFile);
-    } catch (Exception e) {
-      logger.error("Failed to initialize external shuffle service", e);
-    }
 
-    List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
-    if (authEnabled) {
-      secretManager = new ShuffleSecretManager();
-      bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
-    }
+      // If authentication is enabled, set up the shuffle server to use a
+      // special RPC handler that filters out unauthenticated fetch requests
+      boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
+      List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
+      if (authEnabled) {
+        secretManager = new ShuffleSecretManager();
+        bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
+      }
 
-    int port = conf.getInt(
-      SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
-    TransportContext transportContext = new TransportContext(transportConf, 
blockHandler);
-    shuffleServer = transportContext.createServer(port, bootstraps);
-    // the port should normally be fixed, but for tests its useful to find an 
open port
-    port = shuffleServer.getPort();
-    boundPort = port;
-    String authEnabledString = authEnabled ? "enabled" : "not enabled";
-    logger.info("Started YARN shuffle service for Spark on port {}. " +
-      "Authentication is {}.  Registered executor file is {}", port, 
authEnabledString,
-      registeredExecutorFile);
+      int port = conf.getInt(
+        SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+      TransportContext transportContext = new TransportContext(transportConf, 
blockHandler);
+      shuffleServer = transportContext.createServer(port, bootstraps);
+      // the port should normally be fixed, but for tests its useful to find 
an open port
+      port = shuffleServer.getPort();
+      boundPort = port;
+      String authEnabledString = authEnabled ? "enabled" : "not enabled";
+      logger.info("Started YARN shuffle service for Spark on port {}. " +
+        "Authentication is {}.  Registered executor file is {}", port, 
authEnabledString,
+        registeredExecutorFile);
+    } catch (Exception e) {
+      if (stopOnFailure) {
+        throw e;
+      } else {
+        noteFailure(e);
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/b7b5e178/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 40b6cd9..807944f 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -83,18 +83,7 @@ In Mesos coarse-grained mode, run 
`$SPARK_HOME/sbin/start-mesos-shuffle-service.
 slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, 
you may do so
 through Marathon.
 
-In YARN mode, start the shuffle service on each `NodeManager` as follows:
-
-1. Build Spark with the [YARN profile](building-spark.html). Skip this step if 
you are using a
-pre-packaged distribution.
-2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
-`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building 
Spark yourself, and under
-`lib` if you are using a distribution.
-2. Add this jar to the classpath of all `NodeManager`s in your cluster.
-3. In the `yarn-site.xml` on each node, add `spark_shuffle` to 
`yarn.nodemanager.aux-services`,
-then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
-`org.apache.spark.network.yarn.YarnShuffleService`.
-4. Restart all `NodeManager`s in your cluster.
+In YARN mode, follow the instructions 
[here](running-on-yarn.html#configuring-the-external-shuffle-service).
 
 All other relevant configurations are optional and under the 
`spark.dynamicAllocation.*` and
 `spark.shuffle.service.*` namespaces. For more detail, see the

http://git-wip-us.apache.org/repos/asf/spark/blob/b7b5e178/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4e92042..befd3ea 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -539,6 +539,37 @@ launch time. This is done by listing them in the 
`spark.yarn.access.namenodes` p
 spark.yarn.access.namenodes 
hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
 ```
 
+## Configuring the External Shuffle Service
+
+To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, 
follow these
+instructions:
+
+1. Build Spark with the [YARN profile](building-spark.html). Skip this step if 
you are using a
+pre-packaged distribution.
+1. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
+`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building 
Spark yourself, and under
+`lib` if you are using a distribution.
+1. Add this jar to the classpath of all `NodeManager`s in your cluster.
+1. In the `yarn-site.xml` on each node, add `spark_shuffle` to 
`yarn.nodemanager.aux-services`,
+then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
+`org.apache.spark.network.yarn.YarnShuffleService`.
+1. Restart all `NodeManager`s in your cluster.
+
+The following extra configuration options are available when the shuffle 
service is running on YARN:
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+  <td><code>spark.yarn.shuffle.stopOnFailure</code></td>
+  <td><code>false</code></td>
+  <td>
+    Whether to stop the NodeManager when there's a failure in the Spark 
Shuffle Service's
+    initialization. This prevents application failures caused by running 
containers on
+    NodeManagers where the Spark Shuffle Service is not running.
+  </td>
+</tr>
+</table>
+
 ## Launching your application with Apache Oozie
 
 Apache Oozie can launch Spark applications as part of a workflow.

http://git-wip-us.apache.org/repos/asf/spark/blob/b7b5e178/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 5458fb9..e123e78 100644
--- 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -16,13 +16,17 @@
  */
 package org.apache.spark.network.yarn
 
-import java.io.{DataOutputStream, File, FileOutputStream}
+import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission._
+import java.util.EnumSet
 
 import scala.annotation.tailrec
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.service.ServiceStateException
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, 
ApplicationTerminationContext}
@@ -45,7 +49,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
       classOf[YarnShuffleService].getCanonicalName)
     yarnConfig.setInt("spark.shuffle.service.port", 0)
     val localDir = Utils.createTempDir()
-    yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
+    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
   }
 
   var s1: YarnShuffleService = null
@@ -316,4 +320,28 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
 
     s2.stop()
   }
- }
+
+  test("service throws error if cannot start") {
+    // Create a different config with a read-only local dir.
+    val roConfig = new YarnConfiguration(yarnConfig)
+    val roDir = Utils.createTempDir()
+    Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, 
OWNER_EXECUTE))
+    roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
+    roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
+
+    // Try to start the shuffle service, it should fail.
+    val service = new YarnShuffleService()
+
+    try {
+      val error = intercept[ServiceStateException] {
+        service.init(roConfig)
+      }
+      assert(error.getCause().isInstanceOf[IOException])
+    } finally {
+      service.stop()
+      Files.setPosixFilePermissions(roDir.toPath(),
+        EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to