Repository: spark
Updated Branches:
  refs/heads/master 21c562fa0 -> a3989058c


[SPARK-10827][CORE] AppClient should not use `askWithReply` in `receiveAndReply`

Changed AppClient to be non-blocking in `receiveAndReply` by using a separate 
thread to wait for response and reply to the context.  The threads are managed 
by a thread pool.  Also added unit tests for the AppClient interface.

Author: Bryan Cutler <bjcut...@us.ibm.com>

Closes #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3989058
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3989058
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3989058

Branch: refs/heads/master
Commit: a3989058c0938c8c59c278e7d1a766701cfa255b
Parents: 21c562f
Author: Bryan Cutler <bjcut...@us.ibm.com>
Authored: Tue Nov 10 16:32:32 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 10 16:32:32 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/client/AppClient.scala  |  33 ++-
 .../spark/deploy/client/AppClientSuite.scala    | 209 +++++++++++++++++++
 2 files changed, 238 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3989058/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 25ea692..3f29da6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -49,8 +49,8 @@ private[spark] class AppClient(
   private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var endpoint: RpcEndpointRef = null
-  private var appId: String = null
+  @volatile private var endpoint: RpcEndpointRef = null
+  @volatile private var appId: String = null
   @volatile private var registered = false
 
   private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
@@ -77,6 +77,11 @@ private[spark] class AppClient(
     private val registrationRetryThread =
       
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
 
+    // A thread pool to perform receive then reply actions in a thread so as 
not to block the
+    // event loop.
+    private val askAndReplyThreadPool =
+      
ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
+
     override def onStart(): Unit = {
       try {
         registerWithMaster(1)
@@ -200,7 +205,7 @@ private[spark] class AppClient(
 
       case r: RequestExecutors =>
         master match {
-          case Some(m) => context.reply(m.askWithRetry[Boolean](r))
+          case Some(m) => askAndReplyAsync(m, context, r)
           case None =>
             logWarning("Attempted to request executors before registering with 
Master.")
             context.reply(false)
@@ -208,13 +213,32 @@ private[spark] class AppClient(
 
       case k: KillExecutors =>
         master match {
-          case Some(m) => context.reply(m.askWithRetry[Boolean](k))
+          case Some(m) => askAndReplyAsync(m, context, k)
           case None =>
             logWarning("Attempted to kill executors before registering with 
Master.")
             context.reply(false)
         }
     }
 
+    private def askAndReplyAsync[T](
+        endpointRef: RpcEndpointRef,
+        context: RpcCallContext,
+        msg: T): Unit = {
+      // Create a thread to ask a message and reply with the result.  Allow 
thread to be
+      // interrupted during shutdown, otherwise context must be notified of 
NonFatal errors.
+      askAndReplyThreadPool.execute(new Runnable {
+        override def run(): Unit = {
+          try {
+            context.reply(endpointRef.askWithRetry[Boolean](msg))
+          } catch {
+            case ie: InterruptedException => // Cancelled
+            case NonFatal(t) =>
+              context.sendFailure(t)
+          }
+        }
+      })
+    }
+
     override def onDisconnected(address: RpcAddress): Unit = {
       if (master.exists(_.address == address)) {
         logWarning(s"Connection to $address failed; waiting for master to 
reconnect...")
@@ -252,6 +276,7 @@ private[spark] class AppClient(
       registrationRetryThread.shutdownNow()
       registerMasterFutures.foreach(_.cancel(true))
       registerMasterThreadPool.shutdownNow()
+      askAndReplyThreadPool.shutdownNow()
     }
 
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a3989058/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
new file mode 100644
index 0000000..1e5c05a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.deploy.{ApplicationDescription, Command}
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
RequestMasterState}
+import org.apache.spark.deploy.master.{ApplicationInfo, Master}
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.util.Utils
+
+/**
+ * End-to-end tests for application client in standalone mode.
+ */
+class AppClientSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfterAll {
+  private val numWorkers = 2
+  private val conf = new SparkConf()
+  private val securityManager = new SecurityManager(conf)
+
+  private var masterRpcEnv: RpcEnv = null
+  private var workerRpcEnvs: Seq[RpcEnv] = null
+  private var master: Master = null
+  private var workers: Seq[Worker] = null
+
+  /**
+   * Start the local cluster.
+   * Note: local-cluster mode is insufficient because we want a reference to 
the Master.
+   */
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, 
securityManager)
+    workerRpcEnvs = (0 until numWorkers).map { i =>
+      RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, 
securityManager)
+    }
+    master = makeMaster()
+    workers = makeWorkers(10, 2048)
+    // Wait until all workers register with master successfully
+    eventually(timeout(60.seconds), interval(10.millis)) {
+      assert(getMasterState.workers.size === numWorkers)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    workerRpcEnvs.foreach(_.shutdown())
+    masterRpcEnv.shutdown()
+    workers.foreach(_.stop())
+    master.stop()
+    workerRpcEnvs = null
+    masterRpcEnv = null
+    workers = null
+    master = null
+    super.afterAll()
+  }
+
+  test("interface methods of AppClient using local Master") {
+    val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
+
+    ci.client.start()
+
+    // Client should connect with one Master which registers the application
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(ci.listener.connectedIdList.size === 1, "client listener should 
have one connection")
+      assert(apps.size === 1, "master should have 1 registered app")
+    }
+
+    // Send message to Master to request Executors, verify request by change 
in executor limit
+    val numExecutorsRequested = 1
+    assert(ci.client.requestTotalExecutors(numExecutorsRequested))
+
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor 
request failed")
+    }
+
+    // Send request to kill executor, verify request was made
+    assert {
+      val apps = getApplications()
+      val executorId: String = apps.head.executors.head._2.fullId
+      ci.client.killExecutors(Seq(executorId))
+    }
+
+    // Issue stop command for Client to disconnect from Master
+    ci.client.stop()
+
+    // Verify Client is marked dead and unregistered from Master
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(ci.listener.deadReasonList.size === 1, "client should have been 
marked dead")
+      assert(apps.isEmpty, "master should have 0 registered apps")
+    }
+  }
+
+  test("request from AppClient before initialized with master") {
+    val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
+
+    // requests to master should fail immediately
+    assert(ci.client.requestTotalExecutors(3) === false)
+  }
+
+  // ===============================
+  // | Utility methods for testing |
+  // ===============================
+
+  /** Return a SparkConf for applications that want to talk to our Master. */
+  private def appConf: SparkConf = {
+    new SparkConf()
+      .setMaster(masterRpcEnv.address.toSparkURL)
+      .setAppName("test")
+      .set("spark.executor.memory", "256m")
+  }
+
+  /** Make a master to which our application will send executor requests. */
+  private def makeMaster(): Master = {
+    val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, 
securityManager, conf)
+    masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    master
+  }
+
+  /** Make a few workers that talk to our master. */
+  private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
+    (0 until numWorkers).map { i =>
+      val rpcEnv = workerRpcEnvs(i)
+      val worker = new Worker(rpcEnv, 0, cores, memory, 
Array(masterRpcEnv.address),
+        Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, 
securityManager)
+      rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
+      worker
+    }
+  }
+
+  /** Get the Master state */
+  private def getMasterState: MasterStateResponse = {
+    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+  }
+
+  /** Get the applictions that are active from Master */
+  private def getApplications(): Seq[ApplicationInfo] = {
+    getMasterState.activeApps
+  }
+
+  /** Application Listener to collect events */
+  private class AppClientCollector extends AppClientListener with Logging {
+    val connectedIdList = new ArrayBuffer[String] with 
SynchronizedBuffer[String]
+    @volatile var disconnectedCount: Int = 0
+    val deadReasonList = new ArrayBuffer[String] with 
SynchronizedBuffer[String]
+    val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String]
+    val execRemovedList = new ArrayBuffer[String] with 
SynchronizedBuffer[String]
+
+    def connected(id: String): Unit = {
+      connectedIdList += id
+    }
+
+    def disconnected(): Unit = {
+      synchronized {
+        disconnectedCount += 1
+      }
+    }
+
+    def dead(reason: String): Unit = {
+      deadReasonList += reason
+    }
+
+    def executorAdded(
+        id: String,
+        workerId: String,
+        hostPort: String,
+        cores: Int,
+        memory: Int): Unit = {
+      execAddedList += id
+    }
+
+    def executorRemoved(id: String, message: String, exitStatus: Option[Int]): 
Unit = {
+      execRemovedList += id
+    }
+  }
+
+  /** Create AppClient and supporting objects */
+  private class AppClientInst(masterUrl: String) {
+    val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, 
securityManager)
+    private val cmd = new 
Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"),
+      List(), Map(), Seq(), Seq(), Seq())
+    private val desc = new ApplicationDescription("AppClientSuite", Some(1), 
512, cmd, "ignored")
+    val listener = new AppClientCollector
+    val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new 
SparkConf)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to