Repository: spark
Updated Branches:
  refs/heads/master 3539cb7d2 -> 343d3bfaf


[SPARK-5100] [SQL] add webui for thriftserver

This PR is a rebased version of #3946 , and mainly focused on creating an 
independent tab for the thrift server in spark web UI.

Features:

1. Session related statistics ( username and IP are only supported in 
hive-0.13.1 )
2. List all the SQL executing or executed on this server
3. Provide links to the job generated by SQL
4. Provide link to show all SQL executing or executed in a specified session

Prototype snapshots:

This is the main page for thrift server

![image](https://cloud.githubusercontent.com/assets/1411869/7361379/df7dcc64-ed89-11e4-9964-4df0b32f475e.png)

Author: tianyi <tianyi.asiai...@gmail.com>

Closes #5730 from tianyi/SPARK-5100 and squashes the following commits:

cfd14c7 [tianyi] style fix
0efe3d5 [tianyi] revert part of pom change
c0f2fa0 [tianyi] extends HiveThriftJdbcTest to start/stop thriftserver for UI 
test
aa20408 [tianyi] fix style problem
c9df6f9 [tianyi] add testsuite for thriftserver ui and fix some style issue
9830199 [tianyi] add webui for thriftserver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/343d3bfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/343d3bfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/343d3bfa

Branch: refs/heads/master
Commit: 343d3bfafd449a0371feb6a88f78e07302fa7143
Parents: 3539cb7
Author: tianyi <tianyi.asiai...@gmail.com>
Authored: Mon May 4 16:59:34 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Mon May 4 16:59:34 2015 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |   2 +
 sql/hive-thriftserver/pom.xml                   |  12 ++
 .../hive/thriftserver/HiveThriftServer2.scala   | 161 ++++++++++++++-
 .../hive/thriftserver/ui/ThriftServerPage.scala | 190 ++++++++++++++++++
 .../ui/ThriftServerSessionPage.scala            | 197 +++++++++++++++++++
 .../hive/thriftserver/ui/ThriftServerTab.scala  |  50 +++++
 .../thriftserver/HiveThriftServer2Suites.scala  |  12 +-
 .../sql/hive/thriftserver/UISeleniumSuite.scala | 105 ++++++++++
 .../spark/sql/hive/thriftserver/Shim12.scala    |  18 +-
 .../spark/sql/hive/thriftserver/Shim13.scala    |  26 ++-
 10 files changed, 751 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2fa602a..99db959 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -52,6 +52,8 @@ private[spark] object SQLConf {
 
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
+  val THRIFTSERVER_UI_STATEMENT_LIMIT = 
"spark.sql.thriftserver.ui.retainedStatements"
+  val THRIFTSERVER_UI_SESSION_LIMIT = 
"spark.sql.thriftserver.ui.retainedSessions"
 
   // This is used to set the default data source
   val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index f38c796..437f697 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -57,6 +57,18 @@
       <groupId>${hive.group}</groupId>
       <artifactId>hive-beeline</artifactId>
     </dependency>
+    <!-- Added for selenium: -->
+    <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-java</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 832596f..0be5a92 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -22,20 +22,27 @@ import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, 
ThriftHttpCLIService}
 import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
+import org.apache.spark.sql.SQLConf
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
+import org.apache.spark.scheduler.{SparkListenerJobStart, 
SparkListenerApplicationEnd, SparkListener}
+import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
 import org.apache.spark.util.Utils
 
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
 /**
  * The main entry point for the Spark SQL port of HiveServer2.  Starts up a 
`SparkSQLContext` and a
  * `HiveThriftServer2` thrift server.
  */
 object HiveThriftServer2 extends Logging {
   var LOG = LogFactory.getLog(classOf[HiveServer2])
+  var uiTab: Option[ThriftServerTab] = _
+  var listener: HiveThriftServer2Listener = _
 
   /**
    * :: DeveloperApi ::
@@ -46,7 +53,13 @@ object HiveThriftServer2 extends Logging {
     val server = new HiveThriftServer2(sqlContext)
     server.init(sqlContext.hiveconf)
     server.start()
-    sqlContext.sparkContext.addSparkListener(new 
HiveThriftServer2Listener(server))
+    listener = new HiveThriftServer2Listener(server, sqlContext.conf)
+    sqlContext.sparkContext.addSparkListener(listener)
+    uiTab = if (sqlContext.sparkContext.getConf.getBoolean("spark.ui.enabled", 
true)) {
+      Some(new ThriftServerTab(sqlContext.sparkContext))
+    } else {
+      None
+    }
   }
 
   def main(args: Array[String]) {
@@ -58,14 +71,23 @@ object HiveThriftServer2 extends Logging {
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
 
-    Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+    Utils.addShutdownHook { () =>
+      SparkSQLEnv.stop()
+      uiTab.foreach(_.detach())
+    }
 
     try {
       val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
       server.init(SparkSQLEnv.hiveContext.hiveconf)
       server.start()
       logInfo("HiveThriftServer2 started")
-      SparkSQLEnv.sparkContext.addSparkListener(new 
HiveThriftServer2Listener(server))
+      listener = new HiveThriftServer2Listener(server, 
SparkSQLEnv.hiveContext.conf)
+      SparkSQLEnv.sparkContext.addSparkListener(listener)
+      uiTab = if 
(SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
+        Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
+      } else {
+        None
+      }
     } catch {
       case e: Exception =>
         logError("Error starting HiveThriftServer2", e)
@@ -73,15 +95,140 @@ object HiveThriftServer2 extends Logging {
     }
   }
 
+  private[thriftserver] class SessionInfo(
+      val sessionId: String,
+      val startTimestamp: Long,
+      val ip: String,
+      val userName: String) {
+    var finishTimestamp: Long = 0L
+    var totalExecution: Int = 0
+    def totalTime: Long = {
+      if (finishTimestamp == 0L) {
+        System.currentTimeMillis - startTimestamp
+      } else {
+        finishTimestamp - startTimestamp
+      }
+    }
+  }
+
+  private[thriftserver] object ExecutionState extends Enumeration {
+    val STARTED, COMPILED, FAILED, FINISHED = Value
+    type ExecutionState = Value
+  }
+
+  private[thriftserver] class ExecutionInfo(
+      val statement: String,
+      val sessionId: String,
+      val startTimestamp: Long,
+      val userName: String) {
+    var finishTimestamp: Long = 0L
+    var executePlan: String = ""
+    var detail: String = ""
+    var state: ExecutionState.Value = ExecutionState.STARTED
+    val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
+    var groupId: String = ""
+    def totalTime: Long = {
+      if (finishTimestamp == 0L) {
+        System.currentTimeMillis - startTimestamp
+      } else {
+        finishTimestamp - startTimestamp
+      }
+    }
+  }
+
+
   /**
    * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
    */
-  class HiveThriftServer2Listener(val server: HiveServer2) extends 
SparkListener {
+  private[thriftserver] class HiveThriftServer2Listener(
+      val server: HiveServer2,
+      val conf: SQLConf) extends SparkListener {
+
     override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd): Unit = {
       server.stop()
     }
-  }
 
+    val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
+    val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
+    val retainedStatements =
+      conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT, "200").toInt
+    val retainedSessions =
+      conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, "200").toInt
+    var totalRunning = 0
+
+    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+      for {
+        props <- Option(jobStart.properties)
+        groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+        (_, info) <- executionList if info.groupId == groupId
+      } {
+        info.jobId += jobStart.jobId.toString
+        info.groupId = groupId
+      }
+    }
+
+    def onSessionCreated(ip: String, sessionId: String, userName: String = 
"UNKNOWN"): Unit = {
+      val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, 
userName)
+      sessionList.put(sessionId, info)
+      trimSessionIfNecessary()
+    }
+
+    def onSessionClosed(sessionId: String): Unit = {
+      sessionList(sessionId).finishTimestamp = System.currentTimeMillis
+    }
+
+    def onStatementStart(
+        id: String,
+        sessionId: String,
+        statement: String,
+        groupId: String,
+        userName: String = "UNKNOWN"): Unit = {
+      val info = new ExecutionInfo(statement, sessionId, 
System.currentTimeMillis, userName)
+      info.state = ExecutionState.STARTED
+      executionList.put(id, info)
+      trimExecutionIfNecessary()
+      sessionList(sessionId).totalExecution += 1
+      executionList(id).groupId = groupId
+      totalRunning += 1
+    }
+
+    def onStatementParsed(id: String, executionPlan: String): Unit = {
+      executionList(id).executePlan = executionPlan
+      executionList(id).state = ExecutionState.COMPILED
+    }
+
+    def onStatementError(id: String, errorMessage: String, errorTrace: 
String): Unit = {
+      executionList(id).finishTimestamp = System.currentTimeMillis
+      executionList(id).detail = errorMessage
+      executionList(id).state = ExecutionState.FAILED
+      totalRunning -= 1
+    }
+
+    def onStatementFinish(id: String): Unit = {
+      executionList(id).finishTimestamp = System.currentTimeMillis
+      executionList(id).state = ExecutionState.FINISHED
+      totalRunning -= 1
+    }
+
+    private def trimExecutionIfNecessary() = synchronized {
+      if (executionList.size > retainedStatements) {
+        val toRemove = math.max(retainedStatements / 10, 1)
+        executionList.take(toRemove).foreach { s =>
+          executionList.remove(s._1)
+        }
+      }
+    }
+
+    private def trimSessionIfNecessary() = synchronized {
+      if (sessionList.size > retainedSessions) {
+        val toRemove = math.max(retainedSessions / 10, 1)
+        sessionList.take(toRemove).foreach { s =>
+          sessionList.remove(s._1)
+        }
+      }
+
+    }
+  }
 }
 
 private[hive] class HiveThriftServer2(hiveContext: HiveContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
new file mode 100644
index 0000000..71b16b6
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.Logging
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{SessionInfo, 
ExecutionState, ExecutionInfo}
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui._
+
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class ThriftServerPage(parent: ThriftServerTab) extends 
WebUIPage("") with Logging {
+
+  private val listener = parent.listener
+  private val startTime = Calendar.getInstance().getTime()
+  private val emptyCell = "-"
+
+  /** Render the page */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val content =
+      generateBasicStats() ++
+      <br/> ++
+      <h4>
+        {listener.sessionList.size} session(s) are online,
+        running {listener.totalRunning} SQL statement(s)
+      </h4> ++
+      generateSessionStatsTable() ++
+      generateSQLStatsTable()
+    UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000))
+  }
+
+  /** Generate basic stats of the streaming program */
+  private def generateBasicStats(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+    <ul class ="unstyled">
+      <li>
+        <strong>Started at: </strong> {startTime.toString}
+      </li>
+      <li>
+        <strong>Time since start: 
</strong>{formatDurationVerbose(timeSinceStart)}
+      </li>
+    </ul>
+  }
+
+  /** Generate stats of batch statements of the thrift server program */
+  private def generateSQLStatsTable(): Seq[Node] = {
+    val numStatement = listener.executionList.size
+    val table = if (numStatement > 0) {
+      val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Duration",
+        "Statement", "State", "Detail")
+      val dataRows = listener.executionList.values
+
+      def generateDataRow(info: ExecutionInfo): Seq[Node] = {
+        val jobLink = info.jobId.map { id: String =>
+          <a 
href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
+            [{id}]
+          </a>
+        }
+        val detail = if(info.state == ExecutionState.FAILED) info.detail else 
info.executePlan
+        <tr>
+          <td>{info.userName}</td>
+          <td>
+            {jobLink}
+          </td>
+          <td>{info.groupId}</td>
+          <td>{formatDate(info.startTimestamp)}</td>
+          <td>{if(info.finishTimestamp > 0) 
formatDate(info.finishTimestamp)}</td>
+          <td>{formatDurationOption(Some(info.totalTime))}</td>
+          <td>{info.statement}</td>
+          <td>{info.state}</td>
+          {errorMessageCell(detail)}
+        </tr>
+      }
+
+      Some(UIUtils.listingTable(headerRow, generateDataRow,
+        dataRows, false, None, Seq(null), false))
+    } else {
+      None
+    }
+
+    val content =
+      <h5 id="sqlstat">SQL Statistics</h5> ++
+        <div>
+          <ul class="unstyled">
+            {table.getOrElse("No statistics have been generated yet.")}
+          </ul>
+        </div>
+
+    content
+  }
+
+  private def errorMessageCell(errorMessage: String): Seq[Node] = {
+    val isMultiline = errorMessage.indexOf('\n') >= 0
+    val errorSummary = StringEscapeUtils.escapeHtml4(
+      if (isMultiline) {
+        errorMessage.substring(0, errorMessage.indexOf('\n'))
+      } else {
+        errorMessage
+      })
+    val details = if (isMultiline) {
+      // scalastyle:off
+      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        + details
+      </span> ++
+      <div class="stacktrace-details collapsed">
+        <pre>{errorMessage}</pre>
+      </div>
+      // scalastyle:on
+    } else {
+      ""
+    }
+    <td>{errorSummary}{details}</td>
+  }
+
+  /** Generate stats of batch sessions of the thrift server program */
+  private def generateSessionStatsTable(): Seq[Node] = {
+    val numBatches = listener.sessionList.size
+    val table = if (numBatches > 0) {
+      val dataRows =
+        listener.sessionList.values
+      val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish 
Time", "Duration",
+        "Total Execute")
+      def generateDataRow(session: SessionInfo): Seq[Node] = {
+        val sessionLink = "%s/ThriftServer/session?id=%s"
+          .format(UIUtils.prependBaseUri(parent.basePath), session.sessionId)
+        <tr>
+          <td> {session.userName} </td>
+          <td> {session.ip} </td>
+          <td> <a href={sessionLink}> {session.sessionId} </a> </td>,
+          <td> {formatDate(session.startTimestamp)} </td>
+          <td> {if(session.finishTimestamp > 0) 
formatDate(session.finishTimestamp)} </td>
+          <td> {formatDurationOption(Some(session.totalTime))} </td>
+          <td> {session.totalExecution.toString} </td>
+        </tr>
+      }
+      Some(UIUtils.listingTable(headerRow, generateDataRow, dataRows, true, 
None, Seq(null), false))
+    } else {
+      None
+    }
+
+    val content =
+      <h5 id="sessionstat">Session Statistics</h5> ++
+      <div>
+        <ul class="unstyled">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </ul>
+      </div>
+
+    content
+  }
+
+
+  /**
+   * Returns a human-readable string representing a duration such as "5 second 
35 ms"
+   */
+  private def formatDurationOption(msOption: Option[Long]): String = {
+    msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+  }
+
+  /** Generate HTML table from string data */
+  private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+    def generateDataRow(data: Seq[String]): Seq[Node] = {
+      <tr> {data.map(d => <td>{d}</td>)} </tr>
+    }
+    UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
new file mode 100644
index 0000000..33ba038
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import java.util.Calendar
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.Logging
+import 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, 
ExecutionState}
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui._
+
+/** Page for Spark Web UI that shows statistics of a streaming job */
+private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
+  extends WebUIPage("session") with Logging {
+
+  private val listener = parent.listener
+  private val startTime = Calendar.getInstance().getTime()
+  private val emptyCell = "-"
+
+  /** Render the page */
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val parameterId = request.getParameter("id")
+    require(parameterId != null && parameterId.nonEmpty, "Missing id 
parameter")
+    val sessionStat = listener.sessionList.find(stat => {
+      stat._1 == parameterId
+    }).getOrElse(null)
+    require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
+
+    val content =
+      generateBasicStats() ++
+      <br/> ++
+      <h4>
+        User {sessionStat._2.userName},
+        IP {sessionStat._2.ip},
+        Session created at {formatDate(sessionStat._2.startTimestamp)},
+        Total run {sessionStat._2.totalExecution} SQL
+      </h4> ++
+      generateSQLStatsTable(sessionStat._2.sessionId)
+    UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000))
+  }
+
+  /** Generate basic stats of the streaming program */
+  private def generateBasicStats(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+    <ul class ="unstyled">
+      <li>
+        <strong>Started at: </strong> {startTime.toString}
+      </li>
+      <li>
+        <strong>Time since start: 
</strong>{formatDurationVerbose(timeSinceStart)}
+      </li>
+    </ul>
+  }
+
+  /** Generate stats of batch statements of the thrift server program */
+  private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
+    val executionList = listener.executionList
+      .filter(_._2.sessionId == sessionID)
+    val numStatement = executionList.size
+    val table = if (numStatement > 0) {
+      val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Duration",
+        "Statement", "State", "Detail")
+      val dataRows = 
executionList.values.toSeq.sortBy(_.startTimestamp).reverse
+
+      def generateDataRow(info: ExecutionInfo): Seq[Node] = {
+        val jobLink = info.jobId.map { id: String =>
+          <a 
href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
+            [{id}]
+          </a>
+        }
+        val detail = if(info.state == ExecutionState.FAILED) info.detail else 
info.executePlan
+        <tr>
+          <td>{info.userName}</td>
+          <td>
+            {jobLink}
+          </td>
+          <td>{info.groupId}</td>
+          <td>{formatDate(info.startTimestamp)}</td>
+          <td>{formatDate(info.finishTimestamp)}</td>
+          <td>{formatDurationOption(Some(info.totalTime))}</td>
+          <td>{info.statement}</td>
+          <td>{info.state}</td>
+          {errorMessageCell(detail)}
+        </tr>
+      }
+
+      Some(UIUtils.listingTable(headerRow, generateDataRow,
+        dataRows, false, None, Seq(null), false))
+    } else {
+      None
+    }
+
+    val content =
+      <h5>SQL Statistics</h5> ++
+        <div>
+          <ul class="unstyled">
+            {table.getOrElse("No statistics have been generated yet.")}
+          </ul>
+        </div>
+
+    content
+  }
+
+  private def errorMessageCell(errorMessage: String): Seq[Node] = {
+    val isMultiline = errorMessage.indexOf('\n') >= 0
+    val errorSummary = StringEscapeUtils.escapeHtml4(
+      if (isMultiline) {
+        errorMessage.substring(0, errorMessage.indexOf('\n'))
+      } else {
+        errorMessage
+      })
+    val details = if (isMultiline) {
+      // scalastyle:off
+      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        + details
+      </span> ++
+      <div class="stacktrace-details collapsed">
+        <pre>{errorMessage}</pre>
+      </div>
+      // scalastyle:on
+    } else {
+      ""
+    }
+    <td>{errorSummary}{details}</td>
+  }
+
+  /** Generate stats of batch sessions of the thrift server program */
+  private def generateSessionStatsTable(): Seq[Node] = {
+    val numBatches = listener.sessionList.size
+    val table = if (numBatches > 0) {
+      val dataRows =
+        listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map 
( session =>
+        Seq(
+          session.userName,
+          session.ip,
+          session.sessionId,
+          formatDate(session.startTimestamp),
+          formatDate(session.finishTimestamp),
+          formatDurationOption(Some(session.totalTime)),
+          session.totalExecution.toString
+        )
+      ).toSeq
+      val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish 
Time", "Duration",
+        "Total Execute")
+      Some(listingTable(headerRow, dataRows))
+    } else {
+      None
+    }
+
+    val content =
+      <h5>Session Statistics</h5> ++
+      <div>
+        <ul class="unstyled">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </ul>
+      </div>
+
+    content
+  }
+
+
+  /**
+   * Returns a human-readable string representing a duration such as "5 second 
35 ms"
+   */
+  private def formatDurationOption(msOption: Option[Long]): String = {
+    msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+  }
+
+  /** Generate HTML table from string data */
+  private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
+    def generateDataRow(data: Seq[String]): Seq[Node] = {
+      <tr> {data.map(d => <td>{d}</td>)} </tr>
+    }
+    UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
new file mode 100644
index 0000000..343031f
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import org.apache.spark.sql.hive.thriftserver.{HiveThriftServer2, SparkSQLEnv}
+import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+import org.apache.spark.{SparkContext, Logging, SparkException}
+
+/**
+ * Spark Web UI tab that shows statistics of a streaming job.
+ * This assumes the given SparkContext has enabled its SparkUI.
+ */
+private[thriftserver] class ThriftServerTab(sparkContext: SparkContext)
+  extends SparkUITab(getSparkUI(sparkContext), "ThriftServer") with Logging {
+
+  val parent = getSparkUI(sparkContext)
+  val listener = HiveThriftServer2.listener
+
+  attachPage(new ThriftServerPage(this))
+  attachPage(new ThriftServerSessionPage(this))
+  parent.attachTab(this)
+
+  def detach() {
+    getSparkUI(sparkContext).detachTab(this)
+  }
+}
+
+private[thriftserver] object ThriftServerTab {
+  def getSparkUI(sparkContext: SparkContext): SparkUI = {
+    sparkContext.ui.getOrElse {
+      throw new SparkException("Parent SparkUI to attach this tab to not 
found!")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 4cf95e7..1fadea9 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -409,24 +409,24 @@ abstract class HiveThriftServer2Test extends FunSuite 
with BeforeAndAfterAll wit
   private val CLASS_NAME = 
HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")
   private val LOG_FILE_MARK = s"starting $CLASS_NAME, logging to "
 
-  private val startScript = 
"../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
-  private val stopScript = 
"../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
+  protected val startScript = 
"../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
+  protected val stopScript = 
"../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
 
   private var listeningPort: Int = _
   protected def serverPort: Int = listeningPort
 
   protected def user = System.getProperty("user.name")
 
-  private var warehousePath: File = _
-  private var metastorePath: File = _
-  private def metastoreJdbcUri = 
s"jdbc:derby:;databaseName=$metastorePath;create=true"
+  protected var warehousePath: File = _
+  protected var metastorePath: File = _
+  protected def metastoreJdbcUri = 
s"jdbc:derby:;databaseName=$metastorePath;create=true"
 
   private val pidDir: File = Utils.createTempDir("thriftserver-pid")
   private var logPath: File = _
   private var logTailingProcess: Process = _
   private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
 
-  private def serverStartCommand(port: Int) = {
+  protected def serverStartCommand(port: Int) = {
     val portConf = if (mode == ServerMode.binary) {
       ConfVars.HIVE_SERVER2_THRIFT_PORT
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
new file mode 100644
index 0000000..4754101
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+
+
+import scala.util.Random
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest.{Matchers, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.selenium.WebBrowser
+import org.scalatest.time.SpanSugar._
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.spark.sql.hive.HiveContext
+
+
+class UISeleniumSuite
+  extends HiveThriftJdbcTest
+  with WebBrowser with Matchers with BeforeAndAfterAll {
+
+  implicit var webDriver: WebDriver = _
+  var server: HiveThriftServer2 = _
+  var hc: HiveContext = _
+  val uiPort = 20000 + Random.nextInt(10000)
+  override def mode: ServerMode.Value = ServerMode.binary
+
+  override def beforeAll(): Unit = {
+    webDriver = new HtmlUnitDriver
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    if (webDriver != null) {
+      webDriver.quit()
+    }
+    super.afterAll()
+  }
+
+  override protected def serverStartCommand(port: Int) = {
+    val portConf = if (mode == ServerMode.binary) {
+      ConfVars.HIVE_SERVER2_THRIFT_PORT
+    } else {
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT
+    }
+
+    s"""$startScript
+        |  --master local
+        |  --hiveconf hive.root.logger=INFO,console
+        |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
+        |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
+        |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
+        |  --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
+        |  --hiveconf $portConf=$port
+        |  --driver-class-path ${sys.props("java.class.path")}
+        |  --conf spark.ui.enabled=true
+        |  --conf spark.ui.port=$uiPort
+     """.stripMargin.split("\\s+").toSeq
+  }
+
+  test("thrift server ui test") {
+    withJdbcStatement(statement =>{
+      val baseURL = s"http://localhost:${uiPort}";
+
+      val queries = Seq(
+        "CREATE TABLE test_map(key INT, value STRING)",
+        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE 
test_map")
+
+      queries.foreach(statement.execute)
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (baseURL)
+        find(cssSelector("""ul li a[href*="ThriftServer"]""")) should not 
be(None)
+      }
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (baseURL + "/ThriftServer")
+        find(id("sessionstat")) should not be(None)
+        find(id("sqlstat")) should not be(None)
+
+        // check whether statements exists
+        queries.foreach { line =>
+          findAll(cssSelector("""ul table tbody tr td""")).map(_.text).toList 
should contain (line)
+        }
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
 
b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 95a6e86..b3a79ba 100644
--- 
a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ 
b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.sql.{Date, Timestamp}
 import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, Map => JMap}
+import java.util.{ArrayList => JArrayList, Map => JMap, UUID}
 
 import org.apache.commons.logging.Log
 import org.apache.hadoop.hive.conf.HiveConf
@@ -190,9 +190,12 @@ private[hive] class SparkExecuteStatementOperation(
   }
 
   def run(): Unit = {
+    val statementId = UUID.randomUUID().toString
     logInfo(s"Running query '$statement'")
     setState(OperationState.RUNNING)
-    hiveContext.sparkContext.setJobDescription(statement)
+    HiveThriftServer2.listener.onStatementStart(
+      statementId, parentSession.getSessionHandle.getSessionId.toString, 
statement, statementId)
+    hiveContext.sparkContext.setJobGroup(statementId, statement)
     sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
       hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
     }
@@ -205,6 +208,7 @@ private[hive] class SparkExecuteStatementOperation(
           logInfo(s"Setting spark.scheduler.pool=$value for future statements 
in this session.")
         case _ =>
       }
+      HiveThriftServer2.listener.onStatementParsed(statementId, 
result.queryExecution.toString())
       iter = {
         val useIncrementalCollect =
           hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", 
"false").toBoolean
@@ -221,10 +225,13 @@ private[hive] class SparkExecuteStatementOperation(
       // HiveServer will silently swallow them.
       case e: Throwable =>
         setState(OperationState.ERROR)
+        HiveThriftServer2.listener.onStatementError(
+          statementId, e.getMessage, e.getStackTraceString)
         logError("Error executing query:",e)
         throw new HiveSQLException(e.toString)
     }
     setState(OperationState.FINISHED)
+    HiveThriftServer2.listener.onStatementFinish(statementId)
   }
 }
 
@@ -255,11 +262,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: 
HiveContext)
       withImpersonation: Boolean,
       delegationToken: String): SessionHandle = {
     hiveContext.openSession()
-
-    super.openSession(username, passwd, sessionConf, withImpersonation, 
delegationToken)
+    val sessionHandle = super.openSession(
+      username, passwd, sessionConf, withImpersonation, delegationToken)
+    HiveThriftServer2.listener.onSessionCreated("UNKNOWN", 
sessionHandle.getSessionId.toString)
+    sessionHandle
   }
 
   override def closeSession(sessionHandle: SessionHandle) {
+    
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
     super.closeSession(sessionHandle)
     sparkSqlOperationManager.sessionToActivePool -= sessionHandle
 

http://git-wip-us.apache.org/repos/asf/spark/blob/343d3bfa/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
 
b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 178eb1a..b9d4f1c 100644
--- 
a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ 
b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.sql.{Date, Timestamp}
 import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID}
 
 import org.apache.commons.logging.Log
 import org.apache.hadoop.hive.conf.HiveConf
@@ -36,7 +36,7 @@ import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
 import org.apache.spark.sql.execution.SetCommand
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
@@ -161,9 +161,16 @@ private[hive] class SparkExecuteStatementOperation(
   }
 
   def run(): Unit = {
+    val statementId = UUID.randomUUID().toString
     logInfo(s"Running query '$statement'")
     setState(OperationState.RUNNING)
-    hiveContext.sparkContext.setJobDescription(statement)
+    HiveThriftServer2.listener.onStatementStart(
+      statementId,
+      parentSession.getSessionHandle.getSessionId.toString,
+      statement,
+      statementId,
+      parentSession.getUsername)
+    hiveContext.sparkContext.setJobGroup(statementId, statement)
     sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
       hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
     }
@@ -176,6 +183,7 @@ private[hive] class SparkExecuteStatementOperation(
           logInfo(s"Setting spark.scheduler.pool=$value for future statements 
in this session.")
         case _ =>
       }
+      HiveThriftServer2.listener.onStatementParsed(statementId, 
result.queryExecution.toString())
       iter = {
         val useIncrementalCollect =
           hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", 
"false").toBoolean
@@ -192,10 +200,13 @@ private[hive] class SparkExecuteStatementOperation(
       // HiveServer will silently swallow them.
       case e: Throwable =>
         setState(OperationState.ERROR)
+        HiveThriftServer2.listener.onStatementError(
+          statementId, e.getMessage, e.getStackTraceString)
         logError("Error executing query:", e)
         throw new HiveSQLException(e.toString)
     }
     setState(OperationState.FINISHED)
+    HiveThriftServer2.listener.onStatementFinish(statementId)
   }
 }
 
@@ -227,11 +238,16 @@ private[hive] class SparkSQLSessionManager(hiveContext: 
HiveContext)
       withImpersonation: Boolean,
       delegationToken: String): SessionHandle = {
     hiveContext.openSession()
-
-    super.openSession(protocol, username, passwd, sessionConf, 
withImpersonation, delegationToken)
+    val sessionHandle = super.openSession(
+      protocol, username, passwd, sessionConf, withImpersonation, 
delegationToken)
+    val session = super.getSession(sessionHandle)
+    HiveThriftServer2.listener.onSessionCreated(
+      session.getIpAddress, sessionHandle.getSessionId.toString, 
session.getUsername)
+    sessionHandle
   }
 
   override def closeSession(sessionHandle: SessionHandle) {
+    
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
     super.closeSession(sessionHandle)
     sparkSqlOperationManager.sessionToActivePool -= sessionHandle
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to