Repository: spark
Updated Branches:
  refs/heads/master 722afbb2b -> 92ce8d484


[SPARK-15487][WEB UI] Spark Master UI to reverse proxy Application and Workers 
UI

## What changes were proposed in this pull request?

This pull request adds the functionality to enable accessing worker and 
application UI through master UI itself. Thus helps in accessing SparkUI when 
running spark cluster in closed networks e.g. Kubernetes. Cluster admin needs 
to expose only spark master UI and rest of the UIs can be in the private 
network, master UI will reverse proxy the connection request to corresponding 
resource. It adds the path for workers/application UIs as

WorkerUI: <http/https>://master-publicIP:<port>/target/workerID/
ApplicationUI: <http/https>://master-publicIP:<port>/target/appID/

This makes it easy for users to easily protect the Spark master cluster access 
by putting some reverse proxy e.g. https://github.com/bitly/oauth2_proxy

## How was this patch tested?

The functionality has been tested manually and there is a unit test too for 
testing access to worker UI with reverse proxy address.

pwendell bomeng BryanCutler can you please review it, thanks.

Author: Gurvinder Singh <gurvinder.si...@uninett.no>

Closes #13950 from gurvindersingh/rproxy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92ce8d48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92ce8d48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92ce8d48

Branch: refs/heads/master
Commit: 92ce8d4849a0341c4636e70821b7be57ad3055b1
Parents: 722afbb
Author: Gurvinder Singh <gurvinder.si...@uninett.no>
Authored: Thu Sep 8 17:20:20 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Sep 8 17:20:20 2016 -0700

----------------------------------------------------------------------
 core/pom.xml                                    | 12 ++-
 .../scala/org/apache/spark/SparkContext.scala   |  3 +
 .../org/apache/spark/deploy/master/Master.scala | 18 +++++
 .../deploy/master/ui/ApplicationPage.scala      | 13 ++-
 .../spark/deploy/master/ui/MasterPage.scala     | 12 ++-
 .../spark/deploy/master/ui/MasterWebUI.scala    | 16 ++++
 .../spark/deploy/worker/ExecutorRunner.scala    |  6 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  3 +
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 85 ++++++++++++++++++++
 .../scala/org/apache/spark/ui/UIUtils.scala     | 12 +++
 .../spark/deploy/master/MasterSuite.scala       | 27 +++++++
 .../scala/org/apache/spark/ui/UISuite.scala     | 37 +++++++++
 docs/configuration.md                           | 14 ++++
 pom.xml                                         | 14 ++++
 .../org/apache/spark/repl/SparkILoopInit.scala  | 13 ++-
 .../org/apache/spark/repl/SparkILoop.scala      | 13 ++-
 16 files changed, 287 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 69a0b0f..3c8138f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -126,6 +126,16 @@
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-proxy</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-client</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-servlets</artifactId>
       <scope>compile</scope>
     </dependency>
@@ -388,7 +398,7 @@
               <overWriteIfNewer>true</overWriteIfNewer>
               <useSubDirectoryPerType>true</useSubDirectoryPerType>
               <includeArtifactIds>
-                
guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
+                
guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,jetty-proxy,jetty-client
               </includeArtifactIds>
               <silent>true</silent>
             </configuration>

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4aa795a..e32e4aa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -505,6 +505,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     _applicationId = _taskScheduler.applicationId()
     _applicationAttemptId = taskScheduler.applicationAttemptId()
     _conf.set("spark.app.id", _applicationId)
+    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
+      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
+    }
     _ui.foreach(_.setAppId(_applicationId))
     _env.blockManager.initialize(_applicationId)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dcf4163..8c91aa1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -114,6 +114,7 @@ private[deploy] class Master(
 
   // Default maxCores for applications that don't specify it (i.e. pass 
Int.MaxValue)
   private val defaultCores = conf.getInt("spark.deploy.defaultCores", 
Int.MaxValue)
+  val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
   if (defaultCores < 1) {
     throw new SparkException("spark.deploy.defaultCores must be positive")
   }
@@ -129,6 +130,11 @@ private[deploy] class Master(
     webUi = new MasterWebUI(this, webUiPort)
     webUi.bind()
     masterWebUiUrl = "http://"; + masterPublicAddress + ":" + webUi.boundPort
+    if (reverseProxy) {
+      masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
+      logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and 
" +
+       s"Applications UIs are available at $masterWebUiUrl")
+    }
     checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new 
Runnable {
       override def run(): Unit = Utils.tryLogNonFatalError {
         self.send(CheckForWorkerTimeOut)
@@ -755,6 +761,9 @@ private[deploy] class Master(
     workers += worker
     idToWorker(worker.id) = worker
     addressToWorker(workerAddress) = worker
+    if (reverseProxy) {
+       webUi.addProxyTargets(worker.id, worker.webUiAddress)
+    }
     true
   }
 
@@ -763,6 +772,9 @@ private[deploy] class Master(
     worker.setState(WorkerState.DEAD)
     idToWorker -= worker.id
     addressToWorker -= worker.endpoint.address
+    if (reverseProxy) {
+      webUi.removeProxyTargets(worker.id)
+    }
     for (exec <- worker.executors.values) {
       logInfo("Telling app of lost executor: " + exec.id)
       exec.application.driver.send(ExecutorUpdated(
@@ -810,6 +822,9 @@ private[deploy] class Master(
     endpointToApp(app.driver) = app
     addressToApp(appAddress) = app
     waitingApps += app
+    if (reverseProxy) {
+      webUi.addProxyTargets(app.id, app.desc.appUiUrl)
+    }
   }
 
   private def finishApplication(app: ApplicationInfo) {
@@ -823,6 +838,9 @@ private[deploy] class Master(
       idToApp -= app.id
       endpointToApp -= app.driver
       addressToApp -= app.driver.address
+      if (reverseProxy) {
+        webUi.removeProxyTargets(app.id)
+      }
       if (completedApps.size >= RETAINED_APPLICATIONS) {
         val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
         completedApps.take(toRemove).foreach { a =>

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 8875fc2..17c521c 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -77,7 +77,10 @@ private[ui] class ApplicationPage(parent: MasterWebUI) 
extends WebUIPage("app")
             <li><strong>State:</strong> {app.state}</li>
             {
               if (!app.isFinished) {
-                <li><strong><a href={app.desc.appUiUrl}>Application Detail 
UI</a></strong></li>
+                <li><strong>
+                    <a href={UIUtils.makeHref(parent.master.reverseProxy,
+                      app.id, app.desc.appUiUrl)}>Application Detail UI</a>
+                </strong></li>
               }
             }
           </ul>
@@ -100,19 +103,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) 
extends WebUIPage("app")
   }
 
   private def executorRow(executor: ExecutorDesc): Seq[Node] = {
+    val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy,
+      executor.worker.id, executor.worker.webUiAddress)
     <tr>
       <td>{executor.id}</td>
       <td>
-        <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+        <a href={workerUrlRef}>{executor.worker.id}</a>
       </td>
       <td>{executor.cores}</td>
       <td>{executor.memory}</td>
       <td>{executor.state}</td>
       <td>
         <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
-          .format(executor.worker.webUiAddress, executor.application.id, 
executor.id)}>stdout</a>
+          .format(workerUrlRef, executor.application.id, 
executor.id)}>stdout</a>
         <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
-          .format(executor.worker.webUiAddress, executor.application.id, 
executor.id)}>stderr</a>
+          .format(workerUrlRef, executor.application.id, 
executor.id)}>stderr</a>
       </td>
     </tr>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 5ed3e39..3fb8605 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -176,7 +176,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
   private def workerRow(worker: WorkerInfo): Seq[Node] = {
     <tr>
       <td>
-        <a href={worker.webUiAddress}>{worker.id}</a>
+          <a href={UIUtils.makeHref(parent.master.reverseProxy,
+            worker.id, worker.webUiAddress)}>{worker.id}</a>
       </td>
       <td>{worker.host}:{worker.port}</td>
       <td>{worker.state}</td>
@@ -210,7 +211,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
           if (app.isFinished) {
             app.desc.name
           } else {
-            <a href={app.desc.appUiUrl}>{app.desc.name}</a>
+            <a href={UIUtils.makeHref(parent.master.reverseProxy,
+              app.id, app.desc.appUiUrl)}>{app.desc.name}</a>
           }
         }
       </td>
@@ -244,7 +246,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
     <tr>
       <td>{driver.id} {killLink}</td>
       <td>{driver.submitDate}</td>
-      <td>{driver.worker.map(w => <a 
href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
+      <td>{driver.worker.map(w =>
+        <a href=
+          {UIUtils.makeHref(parent.master.reverseProxy, w.id, w.webUiAddress)}>
+          {w.id.toString}</a>
+        ).getOrElse("None")}
       </td>
       <td>{driver.state}</td>
       <td sorttable_customkey={driver.desc.cores.toString}>

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a0727ad..8cfd0f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.deploy.master.ui
 
+import scala.collection.mutable.HashMap
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{SparkUI, WebUI}
@@ -34,6 +38,7 @@ class MasterWebUI(
 
   val masterEndpointRef = master.self
   val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
+  private val proxyHandlers = new HashMap[String, ServletContextHandler]
 
   initialize()
 
@@ -48,6 +53,17 @@ class MasterWebUI(
     attachHandler(createRedirectHandler(
       "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
   }
+
+  def addProxyTargets(id: String, target: String): Unit = {
+    var endTarget = target.stripSuffix("/")
+    val handler = createProxyHandler("/proxy/" + id, endTarget)
+    attachHandler(handler)
+    proxyHandlers(id) = handler
+  }
+
+  def removeProxyTargets(id: String): Unit = {
+    proxyHandlers.remove(id).foreach(detachHandler)
+  }
 }
 
 private[master] object MasterWebUI {

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 0606624..d4d8521 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner(
 
       // Add webUI log urls
       val baseUrl =
-        
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=";
+        if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
+        } else {
+          
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=";
+        }
       builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
       builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 724206b..0bedd9a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -203,6 +203,9 @@ private[deploy] class Worker(
     activeMasterWebUiUrl = uiUrl
     master = Some(masterRef)
     connected = true
+    if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+      logInfo(s"WorkerWebUI is available at 
$activeMasterWebUiUrl/proxy/$workerId")
+    }
     // Cancel any outstanding re-registration attempts because we found a new 
master
     cancelLastRegistrationRetry()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 50283f2..24f3f75 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 import scala.xml.Node
 
+import org.eclipse.jetty.client.api.Response
+import org.eclipse.jetty.proxy.ProxyServlet
 import org.eclipse.jetty.server.{Request, Server, ServerConnector}
 import org.eclipse.jetty.server.handler._
 import org.eclipse.jetty.servlet._
@@ -186,6 +188,47 @@ private[spark] object JettyUtils extends Logging {
     contextHandler
   }
 
+  /** Create a handler for proxying request to Workers and Application Drivers 
*/
+  def createProxyHandler(
+      prefix: String,
+      target: String): ServletContextHandler = {
+    val servlet = new ProxyServlet {
+      override def rewriteTarget(request: HttpServletRequest): String = {
+        val rewrittenURI = createProxyURI(
+          prefix, target, request.getRequestURI(), request.getQueryString())
+        if (rewrittenURI == null) {
+          return null
+        }
+        if (!validateDestination(rewrittenURI.getHost(), 
rewrittenURI.getPort())) {
+          return null
+        }
+        rewrittenURI.toString()
+      }
+
+      override def filterServerResponseHeader(
+          clientRequest: HttpServletRequest,
+          serverResponse: Response,
+          headerName: String,
+          headerValue: String): String = {
+        if (headerName.equalsIgnoreCase("location")) {
+          val newHeader = createProxyLocationHeader(
+            prefix, headerValue, clientRequest, 
serverResponse.getRequest().getURI())
+          if (newHeader != null) {
+            return newHeader
+          }
+        }
+        super.filterServerResponseHeader(
+          clientRequest, serverResponse, headerName, headerValue)
+      }
+    }
+
+    val contextHandler = new ServletContextHandler
+    val holder = new ServletHolder(servlet)
+    contextHandler.setContextPath(prefix)
+    contextHandler.addServlet(holder, "/")
+    contextHandler
+  }
+
   /** Add filters, if any, to the given list of ServletContextHandlers */
   def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
     val filters: Array[String] = conf.get("spark.ui.filters", 
"").split(',').map(_.trim())
@@ -332,6 +375,48 @@ private[spark] object JettyUtils extends Logging {
     redirectHandler
   }
 
+  def createProxyURI(prefix: String, target: String, path: String, query: 
String): URI = {
+    if (!path.startsWith(prefix)) {
+      return null
+    }
+
+    val uri = new StringBuilder(target)
+    val rest = path.substring(prefix.length())
+
+    if (!rest.isEmpty()) {
+      if (!rest.startsWith("/")) {
+        uri.append("/")
+      }
+      uri.append(rest)
+    }
+
+    val rewrittenURI = URI.create(uri.toString())
+    if (query != null) {
+      return new URI(
+          rewrittenURI.getScheme(),
+          rewrittenURI.getAuthority(),
+          rewrittenURI.getPath(),
+          query,
+          rewrittenURI.getFragment()
+        ).normalize()
+    }
+    rewrittenURI.normalize()
+  }
+
+  def createProxyLocationHeader(
+      prefix: String,
+      headerValue: String,
+      clientRequest: HttpServletRequest,
+      targetUri: URI): String = {
+    val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
+    if (headerValue.startsWith(toReplace)) {
+      clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
+          prefix + headerValue.substring(toReplace.length())
+    } else {
+      null
+    }
+  }
+
   // Create a new URI from the arguments, handling IPv6 host encoding and 
default ports.
   private def createRedirectURI(
       scheme: String, server: String, port: Int, path: String, query: String) 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 2b6c538..c0d1a22 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging {
 
   def getTimeZoneOffset() : Int =
     TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60
+
+  /**
+  * Return the correct Href after checking if master is running in the
+  * reverse proxy mode or not.
+  */
+  def makeHref(proxy: Boolean, id: String, origHref: String): String = {
+    if (proxy) {
+      s"/proxy/$id"
+    } else {
+      origHref
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 7cbe4e3..831a7bc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -157,6 +157,33 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("master/worker web ui available with reverseProxy") {
+    implicit val formats = org.json4s.DefaultFormats
+    val reverseProxyUrl = "http://localhost:8080";
+    val conf = new SparkConf()
+    conf.set("spark.ui.reverseProxy", "true")
+    conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
+    val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+    localCluster.start()
+    try {
+      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+        val json = 
Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json";)
+          .getLines().mkString("\n")
+        val JArray(workers) = (parse(json) \ "workers")
+        workers.size should be (2)
+        workers.foreach { workerSummaryJson =>
+          val JString(workerId) = workerSummaryJson \ "id"
+          val url = 
s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json";
+          val workerResponse = 
parse(Source.fromURL(url).getLines().mkString("\n"))
+          (workerResponse \ "cores").extract[Int] should be (2)
+          (workerResponse \ "masterwebuiurl").extract[String] should be 
(reverseProxyUrl)
+        }
+      }
+    } finally {
+      localCluster.stop()
+    }
+  }
+
   test("basic scheduling - spread out") {
     basicScheduling(spreadOut = true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2b59b48..dbb8dca 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,10 +18,13 @@
 package org.apache.spark.ui
 
 import java.net.{BindException, ServerSocket}
+import java.net.URI
+import javax.servlet.http.HttpServletRequest
 
 import scala.io.Source
 
 import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
@@ -190,6 +193,40 @@ class UISuite extends SparkFunSuite {
     }
   }
 
+  test("verify proxy rewrittenURI") {
+    val prefix = "/proxy/worker-id"
+    val target = "http://localhost:8081";
+    val path = "/proxy/worker-id/json"
+    var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
+    assert(rewrittenURI.toString() === "http://localhost:8081/json";)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
+    assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done";)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-id", null)
+    assert(rewrittenURI.toString() === "http://localhost:8081";)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-id/test%2F", null)
+    assert(rewrittenURI.toString() === "http://localhost:8081/test%2F";)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-id/%F0%9F%98%84", null)
+    assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84";)
+    rewrittenURI = JettyUtils.createProxyURI(prefix, target, 
"/proxy/worker-noid/json", null)
+    assert(rewrittenURI === null)
+  }
+
+  test("verify rewriting location header for reverse proxy") {
+    val clientRequest = mock(classOf[HttpServletRequest])
+    var headerValue = "http://localhost:4040/jobs";
+    val prefix = "/proxy/worker-id"
+    val targetUri = URI.create("http://localhost:4040";)
+    when(clientRequest.getScheme()).thenReturn("http")
+    when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
+    var newHeader = JettyUtils.createProxyLocationHeader(
+      prefix, headerValue, clientRequest, targetUri)
+    assert(newHeader.toString() === 
"http://localhost:8080/proxy/worker-id/jobs";)
+    headerValue = "http://localhost:4041/jobs";
+    newHeader = JettyUtils.createProxyLocationHeader(
+      prefix, headerValue, clientRequest, targetUri)
+    assert(newHeader === null)
+  }
+
   def stopServer(info: ServerInfo): Unit = {
     if (info != null && info.server != null) info.server.stop
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 6e98f67..ebd0aa7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -658,6 +658,20 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.ui.reverseProxy</code></td>
+  <td>false</td>
+  <td>
+    Enable running Spark Master as reverse proxy for worker and application 
UIs. In this mode, Spark master will reverse proxy the worker and application 
UIs to enable access without requiring direct access to their hosts. Use it 
with caution, as worker and application UI will not be accessible directly, you 
will only be able to access them through spark master/proxy public URL. This 
setting affects all the workers and application UIs running in the cluster and 
must be set on all the workers, drivers and masters.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.ui.reverseProxyUrl</code></td>
+  <td></td>
+  <td>
+    This is the URL where your proxy is running. This URL is for proxy which 
is running in front of Spark Master. This is useful when running proxy for 
authentication e.g. OAuth proxy. Make sure this is a complete URL including 
scheme (http/https) and port to reach your proxy.
+  </td>
+</tr>
+<tr>
   <td><code>spark.worker.ui.retainedExecutors</code></td>
   <td>1000</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e6c2897..3b3ad39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -340,6 +340,18 @@
       </dependency>
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-proxy</artifactId>
+        <version>${jetty.version}</version>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-client</artifactId>
+        <version>${jetty.version}</version>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util</artifactId>
         <version>${jetty.version}</version>
         <scope>provided</scope>
@@ -2256,6 +2268,8 @@
               <include>org.spark-project.spark:unused</include>
               <include>org.eclipse.jetty:jetty-io</include>
               <include>org.eclipse.jetty:jetty-http</include>
+              <include>org.eclipse.jetty:jetty-proxy</include>
+              <include>org.eclipse.jetty:jetty-client</include>
               <include>org.eclipse.jetty:jetty-continuation</include>
               <include>org.eclipse.jetty:jetty-servlet</include>
               <include>org.eclipse.jetty:jetty-servlets</include>

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index 29f63de..b2a6126 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -126,7 +126,18 @@ private[repl] trait SparkILoopInit {
         @transient val spark = 
org.apache.spark.repl.Main.interp.createSparkSession()
         @transient val sc = {
           val _sc = spark.sparkContext
-          _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI 
available at ${webUrl}"))
+          if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+            val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+            if (proxyUrl != null) {
+              println(s"Spark Context Web UI is available at 
${proxyUrl}/proxy/${_sc.applicationId}")
+            } else {
+              println(s"Spark Context Web UI is available at Spark Master 
Public URL")
+            }
+          } else {
+            _sc.uiWebUrl.foreach {
+              webUrl => println(s"Spark context Web UI available at ${webUrl}")
+            }
+          }
           println("Spark context available as 'sc' " +
             s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
           println("Spark session available as 'spark'.")

http://git-wip-us.apache.org/repos/asf/spark/blob/92ce8d48/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 2707b08..76a66c1 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -43,7 +43,18 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
           }
         @transient val sc = {
           val _sc = spark.sparkContext
-          _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI 
available at ${webUrl}"))
+          if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+            val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+            if (proxyUrl != null) {
+              println(s"Spark Context Web UI is available at 
${proxyUrl}/proxy/${_sc.applicationId}")
+            } else {
+              println(s"Spark Context Web UI is available at Spark Master 
Public URL")
+            }
+          } else {
+            _sc.uiWebUrl.foreach {
+              webUrl => println(s"Spark context Web UI available at ${webUrl}")
+            }
+          }
           println("Spark context available as 'sc' " +
             s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
           println("Spark session available as 'spark'.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to