Repository: spark
Updated Branches:
  refs/heads/branch-2.2 ee0d2af95 -> 75e5ea294


[SPARK-20529][CORE] Allow worker and master work with a proxy server

## What changes were proposed in this pull request?

In the current codes, when worker connects to master, master will send its 
address to the worker. Then worker will save this address and use it to 
reconnect in case of failure. However, sometimes, this address is not correct. 
If there is a proxy between master and worker, the address master sent is not 
the address of proxy.

In this PR, the master address used by the worker will be sent to the master, 
then master just replies this address back, worker will use this address to 
reconnect in case of failure. In other words, the worker will use the config 
master address set in the worker side if possible rather than the master 
address set in the master side.

There is still one potential issue though. When a master is restarted or takes 
over leadership, the work will use the address sent from the master to connect. 
If there is still a proxy between  master and worker, the address may be wrong. 
However, there is no way to figure it out just in the worker.

## How was this patch tested?

The new added unit test.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #17821 from zsxwing/SPARK-20529.

(cherry picked from commit 9150bca47e4b8782e20441386d3d225eb5f2f404)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75e5ea29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75e5ea29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75e5ea29

Branch: refs/heads/branch-2.2
Commit: 75e5ea294c15ecfb7366ae15dce196aa92c87ca4
Parents: ee0d2af
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue May 16 10:35:51 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue May 16 10:38:12 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala | 27 ++++++++--
 .../org/apache/spark/deploy/master/Master.scala |  5 +-
 .../org/apache/spark/deploy/worker/Worker.scala | 53 +++++++++++++++++---
 .../spark/deploy/master/MasterSuite.scala       | 46 +++++++++++++++--
 4 files changed, 114 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75e5ea29/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index ac09c6c..fa35e45 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -24,7 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, 
DriverInfo, WorkerInfo}
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.RecoveryState.MasterState
 import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
 import org.apache.spark.util.Utils
 
 private[deploy] sealed trait DeployMessage extends Serializable
@@ -34,6 +34,16 @@ private[deploy] object DeployMessages {
 
   // Worker to Master
 
+  /**
+   * @param id the worker id
+   * @param host the worker host
+   * @param port the worker post
+   * @param worker the worker endpoint ref
+   * @param cores the core number of worker
+   * @param memory the memory size of worker
+   * @param workerWebUiUrl the worker Web UI address
+   * @param masterAddress the master address used by the worker to connect
+   */
   case class RegisterWorker(
       id: String,
       host: String,
@@ -41,7 +51,8 @@ private[deploy] object DeployMessages {
       worker: RpcEndpointRef,
       cores: Int,
       memory: Int,
-      workerWebUiUrl: String)
+      workerWebUiUrl: String,
+      masterAddress: RpcAddress)
     extends DeployMessage {
     Utils.checkHost(host, "Required hostname")
     assert (port > 0)
@@ -80,8 +91,16 @@ private[deploy] object DeployMessages {
 
   sealed trait RegisterWorkerResponse
 
-  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) 
extends DeployMessage
-    with RegisterWorkerResponse
+  /**
+   * @param master the master ref
+   * @param masterWebUiUrl the master Web UI address
+   * @param masterAddress the master address used by the worker to connect. It 
should be
+   *                      [[RegisterWorker.masterAddress]].
+   */
+  case class RegisteredWorker(
+      master: RpcEndpointRef,
+      masterWebUiUrl: String,
+      masterAddress: RpcAddress) extends DeployMessage with 
RegisterWorkerResponse
 
   case class RegisterWorkerFailed(message: String) extends DeployMessage with 
RegisterWorkerResponse
 

http://git-wip-us.apache.org/repos/asf/spark/blob/75e5ea29/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 816bf37..96b53c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -231,7 +231,8 @@ private[deploy] class Master(
       logError("Leadership has been revoked -- master shutting down.")
       System.exit(0)
 
-    case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, 
workerWebUiUrl) =>
+    case RegisterWorker(
+      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, 
masterAddress) =>
       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
         workerHost, workerPort, cores, Utils.megabytesToString(memory)))
       if (state == RecoveryState.STANDBY) {
@@ -243,7 +244,7 @@ private[deploy] class Master(
           workerRef, workerWebUiUrl)
         if (registerWorker(worker)) {
           persistenceEngine.addWorker(worker)
-          workerRef.send(RegisteredWorker(self, masterWebUiUrl))
+          workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
           schedule()
         } else {
           val workerAddress = worker.endpoint.address

http://git-wip-us.apache.org/repos/asf/spark/blob/75e5ea29/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 00b9d1a..ca9243e 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -99,6 +99,20 @@ private[deploy] class Worker(
 
   private val testing: Boolean = sys.props.contains("spark.testing")
   private var master: Option[RpcEndpointRef] = None
+
+  /**
+   * Whether to use the master address in `masterRpcAddresses` if possible. If 
it's disabled, Worker
+   * will just use the address received from Master.
+   */
+  private val preferConfiguredMasterAddress =
+    conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
+  /**
+   * The master address to connect in case of failure. When the connection is 
broken, worker will
+   * use this address to connect. This is usually just one of 
`masterRpcAddresses`. However, when
+   * a master is restarted or takes over leadership, it will be an address 
sent from master, which
+   * may not be in `masterRpcAddresses`.
+   */
+  private var masterAddressToConnect: Option[RpcAddress] = None
   private var activeMasterUrl: String = ""
   private[worker] var activeMasterWebUiUrl : String = ""
   private var workerWebUiUrl: String = ""
@@ -196,10 +210,19 @@ private[deploy] class Worker(
     metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
+  /**
+   * Change to use the new master.
+   *
+   * @param masterRef the new master ref
+   * @param uiUrl the new master Web UI address
+   * @param masterAddress the new master address which the worker should use 
to connect in case of
+   *                      failure
+   */
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String, 
masterAddress: RpcAddress) {
     // activeMasterUrl it's a valid Spark url since we receive it from master.
     activeMasterUrl = masterRef.address.toSparkURL
     activeMasterWebUiUrl = uiUrl
+    masterAddressToConnect = Some(masterAddress)
     master = Some(masterRef)
     connected = true
     if (conf.getBoolean("spark.ui.reverseProxy", false)) {
@@ -266,7 +289,8 @@ private[deploy] class Worker(
             if (registerMasterFutures != null) {
               registerMasterFutures.foreach(_.cancel(true))
             }
-            val masterAddress = masterRef.address
+            val masterAddress =
+              if (preferConfiguredMasterAddress) masterAddressToConnect.get 
else masterRef.address
             registerMasterFutures = Array(registerMasterThreadPool.submit(new 
Runnable {
               override def run(): Unit = {
                 try {
@@ -342,15 +366,27 @@ private[deploy] class Worker(
   }
 
   private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): 
Unit = {
-    masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, 
memory, workerWebUiUrl))
+    masterEndpoint.send(RegisterWorker(
+      workerId,
+      host,
+      port,
+      self,
+      cores,
+      memory,
+      workerWebUiUrl,
+      masterEndpoint.address))
   }
 
   private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = 
synchronized {
     msg match {
-      case RegisteredWorker(masterRef, masterWebUiUrl) =>
-        logInfo("Successfully registered with master " + 
masterRef.address.toSparkURL)
+      case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
+        if (preferConfiguredMasterAddress) {
+          logInfo("Successfully registered with master " + 
masterAddress.toSparkURL)
+        } else {
+          logInfo("Successfully registered with master " + 
masterRef.address.toSparkURL)
+        }
         registered = true
-        changeMaster(masterRef, masterWebUiUrl)
+        changeMaster(masterRef, masterWebUiUrl, masterAddress)
         forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
             self.send(SendHeartbeat)
@@ -419,7 +455,7 @@ private[deploy] class Worker(
 
     case MasterChanged(masterRef, masterWebUiUrl) =>
       logInfo("Master has changed, new master is at " + 
masterRef.address.toSparkURL)
-      changeMaster(masterRef, masterWebUiUrl)
+      changeMaster(masterRef, masterWebUiUrl, masterRef.address)
 
       val execs = executors.values.
         map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
@@ -561,7 +597,8 @@ private[deploy] class Worker(
   }
 
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-    if (master.exists(_.address == remoteAddress)) {
+    if (master.exists(_.address == remoteAddress) ||
+      masterAddressToConnect.exists(_ == remoteAddress)) {
       logInfo(s"$remoteAddress Disassociated !")
       masterDisconnected()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/75e5ea29/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 2127da4..5392646 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -34,7 +34,7 @@ import other.supplier.{CustomPersistenceEngine, 
CustomRecoveryModeFactory}
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy._
 import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
 
 class MasterSuite extends SparkFunSuite
   with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
@@ -447,8 +447,15 @@ class MasterSuite extends SparkFunSuite
       }
     })
 
-    master.self.send(
-      RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, 
"http://localhost:8080";))
+    master.self.send(RegisterWorker(
+      "1",
+      "localhost",
+      9999,
+      fakeWorker,
+      10,
+      1024,
+      "http://localhost:8080";,
+      RpcAddress("localhost", 9999)))
     val executors = (0 until 3).map { i =>
       new ExecutorDescription(appId = i.toString, execId = i, 2, 
ExecutorState.RUNNING)
     }
@@ -459,4 +466,37 @@ class MasterSuite extends SparkFunSuite
       assert(killedDrivers.asScala.toList.sorted === List("0", "1", "2"))
     }
   }
+
+  test("SPARK-20529: Master should reply the address received from worker") {
+    val master = makeMaster()
+    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    eventually(timeout(10.seconds)) {
+      val masterState = 
master.self.askSync[MasterStateResponse](RequestMasterState)
+      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+    }
+
+    @volatile var receivedMasterAddress: RpcAddress = null
+    val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
+      override val rpcEnv: RpcEnv = master.rpcEnv
+
+      override def receive: PartialFunction[Any, Unit] = {
+        case RegisteredWorker(_, _, masterAddress) =>
+          receivedMasterAddress = masterAddress
+      }
+    })
+
+    master.self.send(RegisterWorker(
+      "1",
+      "localhost",
+      9999,
+      fakeWorker,
+      10,
+      1024,
+      "http://localhost:8080";,
+      RpcAddress("localhost2", 10000)))
+
+    eventually(timeout(10.seconds)) {
+      assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to