Repository: spark
Updated Branches:
  refs/heads/master 1ebd41b14 -> 95dccc633


[SPARK-8873] [MESOS] Clean up shuffle files if external shuffle service is used

This patch builds directly on #7820, which is largely written by tnachen. The 
only addition is one commit for cleaning up the code. There should be no 
functional differences between this and #7820.

Author: Timothy Chen <tnac...@gmail.com>
Author: Andrew Or <and...@databricks.com>

Closes #7881 from andrewor14/tim-cleanup-mesos-shuffle and squashes the 
following commits:

8894f7d [Andrew Or] Clean up code
2a5fa10 [Andrew Or] Merge branch 'mesos_shuffle_clean' of 
github.com:tnachen/spark into tim-cleanup-mesos-shuffle
fadff89 [Timothy Chen] Address comments.
e4d0f1d [Timothy Chen] Clean up external shuffle data on driver exit with Mesos.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95dccc63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95dccc63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95dccc63

Branch: refs/heads/master
Commit: 95dccc63350c45045f038bab9f8a5080b4e1f8cc
Parents: 1ebd41b
Author: Timothy Chen <tnac...@gmail.com>
Authored: Mon Aug 3 01:55:58 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Aug 3 01:55:58 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../spark/deploy/ExternalShuffleService.scala   |  17 ++-
 .../mesos/MesosExternalShuffleService.scala     | 107 +++++++++++++++++++
 .../org/apache/spark/rpc/RpcEndpoint.scala      |   6 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  52 ++++++++-
 .../CoarseMesosSchedulerBackendSuite.scala      |   5 +-
 .../launcher/SparkClassCommandBuilder.java      |   3 +-
 .../spark/network/client/TransportClient.java   |   5 +
 .../shuffle/ExternalShuffleBlockHandler.java    |   6 ++
 .../network/shuffle/ExternalShuffleClient.java  |  12 ++-
 .../mesos/MesosExternalShuffleClient.java       |  72 +++++++++++++
 .../shuffle/protocol/BlockTransferMessage.java  |   4 +-
 .../shuffle/protocol/mesos/RegisterDriver.java  |  60 +++++++++++
 sbin/start-mesos-shuffle-service.sh             |  35 ++++++
 sbin/stop-mesos-shuffle-service.sh              |  25 +++++
 15 files changed, 394 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1c66ef..6f336a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2658,7 +2658,7 @@ object SparkContext extends Logging {
         val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw 
Mesos URLs
         val backend = if (coarseGrained) {
-          new CoarseMesosSchedulerBackend(scheduler, sc, url)
+          new CoarseMesosSchedulerBackend(scheduler, sc, url, 
sc.env.securityManager)
         } else {
           new MesosSchedulerBackend(scheduler, sc, url)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 4089c3e..20a9faa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -27,6 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.sasl.SaslServerBootstrap
 import org.apache.spark.network.server.TransportServer
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.Utils
 
 /**
@@ -45,11 +46,16 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
   private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, 
numUsableCores = 0)
-  private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
+  private val blockHandler = newShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext = new 
TransportContext(transportConf, blockHandler)
 
   private var server: TransportServer = _
 
+  /** Create a new shuffle block handler. Factored out for subclasses to 
override. */
+  protected def newShuffleBlockHandler(conf: TransportConf): 
ExternalShuffleBlockHandler = {
+    new ExternalShuffleBlockHandler(conf)
+  }
+
   /** Starts the external shuffle service if the user has configured us to. */
   def startIfEnabled() {
     if (enabled) {
@@ -93,6 +99,13 @@ object ExternalShuffleService extends Logging {
   private val barrier = new CountDownLatch(1)
 
   def main(args: Array[String]): Unit = {
+    main(args, (conf: SparkConf, sm: SecurityManager) => new 
ExternalShuffleService(conf, sm))
+  }
+
+  /** A helper main method that allows the caller to call this with a custom 
shuffle service. */
+  private[spark] def main(
+      args: Array[String],
+      newShuffleService: (SparkConf, SecurityManager) => 
ExternalShuffleService): Unit = {
     val sparkConf = new SparkConf
     Utils.loadDefaultSparkProperties(sparkConf)
     val securityManager = new SecurityManager(sparkConf)
@@ -100,7 +113,7 @@ object ExternalShuffleService extends Logging {
     // we override this value since this service is started from the command 
line
     // and we assume the user really wants it to be running
     sparkConf.set("spark.shuffle.service.enabled", "true")
-    server = new ExternalShuffleService(sparkConf, securityManager)
+    server = newShuffleService(sparkConf, securityManager)
     server.start()
 
     installShutdownHook()

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
new file mode 100644
index 0000000..0618574
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.net.SocketAddress
+
+import scala.collection.mutable
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
+import org.apache.spark.network.util.TransportConf
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers 
running on Mesos.
+ * It detects driver termination and calls the cleanup callback to 
[[ExternalShuffleService]].
+ */
+private[mesos] class MesosExternalShuffleBlockHandler(transportConf: 
TransportConf)
+  extends ExternalShuffleBlockHandler(transportConf) with Logging {
+
+  // Stores a map of driver socket addresses to app ids
+  private val connectedApps = new mutable.HashMap[SocketAddress, String]
+
+  protected override def handleMessage(
+      message: BlockTransferMessage,
+      client: TransportClient,
+      callback: RpcResponseCallback): Unit = {
+    message match {
+      case RegisterDriverParam(appId) =>
+        val address = client.getSocketAddress
+        logDebug(s"Received registration request from app $appId (remote 
address $address).")
+        if (connectedApps.contains(address)) {
+          val existingAppId = connectedApps(address)
+          if (!existingAppId.equals(appId)) {
+            logError(s"A new app '$appId' has connected to existing address 
$address, " +
+              s"removing previously registered app '$existingAppId'.")
+            applicationRemoved(existingAppId, true)
+          }
+        }
+        connectedApps(address) = appId
+        callback.onSuccess(new Array[Byte](0))
+      case _ => super.handleMessage(message, client, callback)
+    }
+  }
+
+  /**
+   * On connection termination, clean up shuffle files written by the 
associated application.
+   */
+  override def connectionTerminated(client: TransportClient): Unit = {
+    val address = client.getSocketAddress
+    if (connectedApps.contains(address)) {
+      val appId = connectedApps(address)
+      logInfo(s"Application $appId disconnected (address was $address).")
+      applicationRemoved(appId, true /* cleanupLocalDirs */)
+      connectedApps.remove(address)
+    } else {
+      logWarning(s"Unknown $address disconnected.")
+    }
+  }
+
+  /** An extractor object for matching [[RegisterDriver]] message. */
+  private object RegisterDriverParam {
+    def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
+  }
+}
+
+/**
+ * A wrapper of [[ExternalShuffleService]] that provides an additional 
endpoint for drivers
+ * to associate with. This allows the shuffle service to detect when a driver 
is terminated
+ * and can clean up the associated shuffle files.
+ */
+private[mesos] class MesosExternalShuffleService(conf: SparkConf, 
securityManager: SecurityManager)
+  extends ExternalShuffleService(conf, securityManager) {
+
+  protected override def newShuffleBlockHandler(
+      conf: TransportConf): ExternalShuffleBlockHandler = {
+    new MesosExternalShuffleBlockHandler(conf)
+  }
+}
+
+private[spark] object MesosExternalShuffleService extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    ExternalShuffleService.main(args,
+      (conf: SparkConf, sm: SecurityManager) => new 
MesosExternalShuffleService(conf, sm))
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
index d2b2bae..dfcbc51 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
@@ -47,11 +47,11 @@ private[spark] trait ThreadSafeRpcEndpoint extends 
RpcEndpoint
  *
  * It is guaranteed that `onStart`, `receive` and `onStop` will be called in 
sequence.
  *
- * The lift-cycle will be:
+ * The life-cycle of an endpoint is:
  *
- * constructor onStart receive* onStop
+ * constructor -> onStart -> receive* -> onStop
  *
- * Note: `receive` can be called concurrently. If you want `receive` is 
thread-safe, please use
+ * Note: `receive` can be called concurrently. If you want `receive` to be 
thread-safe, please use
  * [[ThreadSafeRpcEndpoint]]
  *
  * If any error is thrown from one of [[RpcEndpoint]] methods except 
`onError`, `onError` will be

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index b7fde0d..15a0915 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -26,12 +26,15 @@ import scala.collection.mutable.{HashMap, HashSet}
 
 import com.google.common.collect.HashBiMap
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
+
+import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, 
SparkException, TaskState}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" 
tasks, where it holds
@@ -46,7 +49,8 @@ import org.apache.spark.{SparkContext, SparkEnv, 
SparkException, TaskState}
 private[spark] class CoarseMesosSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext,
-    master: String)
+    master: String,
+    securityManager: SecurityManager)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
   with MScheduler
   with MesosSchedulerUtils {
@@ -56,12 +60,19 @@ private[spark] class CoarseMesosSchedulerBackend(
   // Maximum number of cores to acquire (TODO: we'll need more flexible 
controls here)
   val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
 
+  // If shuffle service is enabled, the Spark driver will register with the 
shuffle service.
+  // This is for cleaning up shuffle files reliably.
+  private val shuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
+
   // Cores we have acquired with each Mesos task ID
   val coresByTaskId = new HashMap[Int, Int]
   var totalCoresAcquired = 0
 
   val slaveIdsWithExecutors = new HashSet[String]
 
+  // Maping from slave Id to hostname
+  private val slaveIdToHost = new HashMap[String, String]
+
   val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
   // How many times tasks on each slave failed
   val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
@@ -90,6 +101,19 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
+  // A client for talking to the external shuffle service, if it is a
+  private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = 
{
+    if (shuffleServiceEnabled) {
+      Some(new MesosExternalShuffleClient(
+        SparkTransportConf.fromSparkConf(conf),
+        securityManager,
+        securityManager.isAuthenticationEnabled(),
+        securityManager.isSaslEncryptionEnabled()))
+    } else {
+      None
+    }
+  }
+
   var nextMesosTaskId = 0
 
   @volatile var appId: String = _
@@ -188,6 +212,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, 
masterInfo: MasterInfo) {
     appId = frameworkId.getValue
+    mesosExternalShuffleClient.foreach(_.init(appId))
     logInfo("Registered as framework ID " + appId)
     markRegistered()
   }
@@ -244,6 +269,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
           // accept the offer and launch the task
           logDebug(s"Accepting offer: $id with attributes: $offerAttributes 
mem: $mem cpu: $cpus")
+          slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
           d.launchTasks(
             Collections.singleton(offer.getId),
             Collections.singleton(taskBuilder.build()), filters)
@@ -261,7 +287,27 @@ private[spark] class CoarseMesosSchedulerBackend(
     val taskId = status.getTaskId.getValue.toInt
     val state = status.getState
     logInfo(s"Mesos task $taskId is now $state")
+    val slaveId: String = status.getSlaveId.getValue
     stateLock.synchronized {
+      // If the shuffle service is enabled, have the driver register with each 
one of the
+      // shuffle services. This allows the shuffle services to clean up state 
associated with
+      // this application when the driver exits. There is currently not a 
great way to detect
+      // this through Mesos, since the shuffle services are set up 
independently.
+      if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
+          slaveIdToHost.contains(slaveId) &&
+          shuffleServiceEnabled) {
+        assume(mesosExternalShuffleClient.isDefined,
+          "External shuffle client was not instantiated even though shuffle 
service is enabled.")
+        // TODO: Remove this and allow the MesosExternalShuffleService to 
detect
+        // framework termination when new Mesos Framework HTTP API is 
available.
+        val externalShufflePort = conf.getInt("spark.shuffle.service.port", 
7337)
+        val hostname = slaveIdToHost.remove(slaveId).get
+        logDebug(s"Connecting to shuffle service on slave $slaveId, " +
+            s"host $hostname, port $externalShufflePort for app 
${conf.getAppId}")
+        mesosExternalShuffleClient.get
+          .registerDriverWithShuffleService(hostname, externalShufflePort)
+      }
+
       if (TaskState.isFinished(TaskState.fromMesos(state))) {
         val slaveId = taskIdToSlaveId(taskId)
         slaveIdsWithExecutors -= slaveId

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 4b504df..525ee0d 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SecurityManager, SparkFunSuite}
 
 class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
     with LocalSparkContext
@@ -59,7 +59,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
   private def createSchedulerBackend(
       taskScheduler: TaskSchedulerImpl,
       driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
-    val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") 
{
+    val securityManager = mock[SecurityManager]
+    val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", 
securityManager) {
       override protected def createSchedulerDriver(
         masterUrl: String,
         scheduler: Scheduler,

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index de85720..5f95e2c 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -69,7 +69,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder 
{
     } else if 
(className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
       javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
       memKey = "SPARK_EXECUTOR_MEMORY";
-    } else if 
(className.equals("org.apache.spark.deploy.ExternalShuffleService")) {
+    } else if 
(className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
+        
className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
       javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
       javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
       memKey = "SPARK_DAEMON_MEMORY";

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 37f2e34..e8e7f06 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.client;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -79,6 +80,10 @@ public class TransportClient implements Closeable {
     return channel.isOpen() || channel.isActive();
   }
 
+  public SocketAddress getSocketAddress() {
+    return channel.remoteAddress();
+  }
+
   /**
    * Requests a single chunk from the remote side, from the pre-negotiated 
streamId.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index e4faaf8..db9dc4f 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -65,7 +65,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
   @Override
   public void receive(TransportClient client, byte[] message, 
RpcResponseCallback callback) {
     BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteArray(message);
+    handleMessage(msgObj, client, callback);
+  }
 
+  protected void handleMessage(
+      BlockTransferMessage msgObj,
+      TransportClient client,
+      RpcResponseCallback callback) {
     if (msgObj instanceof OpenBlocks) {
       OpenBlocks msg = (OpenBlocks) msgObj;
       List<ManagedBuffer> blocks = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 612bce5..ea6d248 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -50,8 +50,8 @@ public class ExternalShuffleClient extends ShuffleClient {
   private final boolean saslEncryptionEnabled;
   private final SecretKeyHolder secretKeyHolder;
 
-  private TransportClientFactory clientFactory;
-  private String appId;
+  protected TransportClientFactory clientFactory;
+  protected String appId;
 
   /**
    * Creates an external shuffle client, with SASL optionally enabled. If SASL 
is not enabled,
@@ -71,6 +71,10 @@ public class ExternalShuffleClient extends ShuffleClient {
     this.saslEncryptionEnabled = saslEncryptionEnabled;
   }
 
+  protected void checkInit() {
+    assert appId != null : "Called before init()";
+  }
+
   @Override
   public void init(String appId) {
     this.appId = appId;
@@ -89,7 +93,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       final String execId,
       String[] blockIds,
       BlockFetchingListener listener) {
-    assert appId != null : "Called before init()";
+    checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
     try {
       RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
@@ -132,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       int port,
       String execId,
       ExecutorShuffleInfo executorInfo) throws IOException {
-    assert appId != null : "Called before init()";
+    checkInit();
     TransportClient client = clientFactory.createClient(host, port);
     byte[] registerMessage = new RegisterExecutor(appId, execId, 
executorInfo).toByteArray();
     client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
new file mode 100644
index 0000000..7543b6b
--- /dev/null
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.mesos;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A client for talking to the external shuffle service in Mesos 
coarse-grained mode.
+ *
+ * This is used by the Spark driver to register with each external shuffle 
service on the cluster.
+ * The reason why the driver has to talk to the service is for cleaning up 
shuffle files reliably
+ * after the application exits. Mesos does not provide a great alternative to 
do this, so Spark
+ * has to detect this itself.
+ */
+public class MesosExternalShuffleClient extends ExternalShuffleClient {
+  private final Logger logger = 
LoggerFactory.getLogger(MesosExternalShuffleClient.class);
+
+  /**
+   * Creates an Mesos external shuffle client that wraps the {@link 
ExternalShuffleClient}.
+   * Please refer to docs on {@link ExternalShuffleClient} for more 
information.
+   */
+  public MesosExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled,
+      boolean saslEncryptionEnabled) {
+    super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
+  }
+
+  public void registerDriverWithShuffleService(String host, int port) throws 
IOException {
+    checkInit();
+    byte[] registerDriver = new RegisterDriver(appId).toByteArray();
+    TransportClient client = clientFactory.createClient(host, port);
+    client.sendRpc(registerDriver, new RpcResponseCallback() {
+      @Override
+      public void onSuccess(byte[] response) {
+        logger.info("Successfully registered app " + appId + " with external 
shuffle service.");
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        logger.warn("Unable to register app " + appId + " with external 
shuffle service. " +
+          "Please manually remove shuffle data after driver exit. Error: " + 
e);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 6c1210b..fcb5236 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import org.apache.spark.network.protocol.Encodable;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
 
 /**
  * Messages handled by the {@link 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
@@ -37,7 +38,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
 
   /** Preceding every serialized message is its type, which allows us to 
deserialize it. */
   public static enum Type {
-    OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3);
+    OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), 
REGISTER_DRIVER(4);
 
     private final byte id;
 
@@ -60,6 +61,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
         case 1: return UploadBlock.decode(buf);
         case 2: return RegisterExecutor.decode(buf);
         case 3: return StreamHandle.decode(buf);
+        case 4: return RegisterDriver.decode(buf);
         default: throw new IllegalArgumentException("Unknown message type: " + 
type);
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
new file mode 100644
index 0000000..1c28fc1
--- /dev/null
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol.mesos;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+
+/**
+ * A message sent from the driver to register with the 
MesosExternalShuffleService.
+ */
+public class RegisterDriver extends BlockTransferMessage {
+  private final String appId;
+
+  public RegisterDriver(String appId) {
+    this.appId = appId;
+  }
+
+  public String getAppId() { return appId; }
+
+  @Override
+  protected Type type() { return Type.REGISTER_DRIVER; }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId);
+  }
+
+  public static RegisterDriver decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    return new RegisterDriver(appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/sbin/start-mesos-shuffle-service.sh
----------------------------------------------------------------------
diff --git a/sbin/start-mesos-shuffle-service.sh 
b/sbin/start-mesos-shuffle-service.sh
new file mode 100755
index 0000000..6458076
--- /dev/null
+++ b/sbin/start-mesos-shuffle-service.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the Mesos external shuffle server on the machine this script is 
executed on.
+# The Mesos external shuffle service detects when an application exits and 
automatically
+# cleans up its shuffle files.
+#
+# Usage: start-mesos-shuffle-server.sh
+#
+# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle service 
configuration.
+#
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+exec "$sbin"/spark-daemon.sh start 
org.apache.spark.deploy.mesos.MesosExternalShuffleService 1

http://git-wip-us.apache.org/repos/asf/spark/blob/95dccc63/sbin/stop-mesos-shuffle-service.sh
----------------------------------------------------------------------
diff --git a/sbin/stop-mesos-shuffle-service.sh 
b/sbin/stop-mesos-shuffle-service.sh
new file mode 100755
index 0000000..0e965d5
--- /dev/null
+++ b/sbin/stop-mesos-shuffle-service.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the Mesos external shuffle service on the machine this script is 
executed on.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+"$sbin"/spark-daemon.sh stop 
org.apache.spark.deploy.mesos.MesosExternalShuffleService 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to