Repository: spark
Updated Branches:
  refs/heads/master 4e8ac6edd -> afbe35cf5


[SPARK-14670] [SQL] allow updating driver side sql metrics

## What changes were proposed in this pull request?

On the SparkUI right now we have this SQLTab that displays accumulator values 
per operator. However, it only displays metrics updated on the executors, not 
on the driver. It is useful to also include driver metrics, e.g. broadcast time.

This is a different version from https://github.com/apache/spark/pull/12427. 
This PR sends driver side accumulator updates right after the updating happens, 
not at the end of execution, by a new event.

## How was this patch tested?

new test in `SQLListenerSuite`

![qq20160606-0](https://cloud.githubusercontent.com/assets/3182036/15841418/0eb137da-2c06-11e6-9068-5694eeb78530.png)

Author: Wenchen Fan <wenc...@databricks.com>

Closes #13189 from cloud-fan/metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afbe35cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afbe35cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afbe35cf

Branch: refs/heads/master
Commit: afbe35cf5b272991b4986e551b42d9201c3862c3
Parents: 4e8ac6e
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Jun 8 22:47:29 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Wed Jun 8 22:47:29 2016 -0700

----------------------------------------------------------------------
 .../exchange/BroadcastExchangeExec.scala        |  9 ++++
 .../spark/sql/execution/ui/SQLListener.scala    | 28 ++++++++--
 .../sql/execution/ui/SQLListenerSuite.scala     | 56 ++++++++++++++++++--
 3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afbe35cf/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index d3081ba..bd0841d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
BroadcastPartitioning, Partitioning}
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -92,6 +93,14 @@ case class BroadcastExchangeExec(
 
         val broadcasted = sparkContext.broadcast(relation)
         longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 
1000000
+
+        // There are some cases we don't care about the metrics and call 
`SparkPlan.doExecute`
+        // directly without setting an execution id. We should be tolerant to 
it.
+        if (executionId != null) {
+          sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
+            executionId.toLong, metrics.values.map(m => m.id -> 
m.value).toSeq))
+        }
+
         broadcasted
       }
     }(BroadcastExchangeExec.executionContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/afbe35cf/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 03b5326..6e94791 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -42,6 +42,10 @@ case class SparkListenerSQLExecutionStart(
 case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
   extends SparkListenerEvent
 
+@DeveloperApi
+case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: 
Seq[(Long, Long)])
+  extends SparkListenerEvent
+
 private[sql] class SQLHistoryListenerFactory extends 
SparkHistoryListenerFactory {
 
   override def createListeners(conf: SparkConf, sparkUI: SparkUI): 
Seq[SparkListener] = {
@@ -251,6 +255,13 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
         }
       }
     }
+    case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => 
synchronized {
+      _executionIdToData.get(executionId).foreach { executionUIData =>
+        for ((accId, accValue) <- accumUpdates) {
+          executionUIData.driverAccumUpdates(accId) = accValue
+        }
+      }
+    }
     case _ => // Ignore
   }
 
@@ -296,7 +307,9 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
             (accumulatorUpdate._1, accumulatorUpdate._2)
           }
         }.filter { case (id, _) => 
executionUIData.accumulatorMetrics.contains(id) }
-        mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
+
+        val driverUpdates = executionUIData.driverAccumUpdates.toSeq
+        mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, 
accumulatorId =>
           executionUIData.accumulatorMetrics(accumulatorId).metricType)
       case None =>
         // This execution has been dropped
@@ -368,10 +381,15 @@ private[ui] class SQLExecutionUIData(
     val physicalPlanDescription: String,
     val physicalPlanGraph: SparkPlanGraph,
     val accumulatorMetrics: Map[Long, SQLPlanMetric],
-    val submissionTime: Long,
-    var completionTime: Option[Long] = None,
-    val jobs: mutable.HashMap[Long, JobExecutionStatus] = 
mutable.HashMap.empty,
-    val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) {
+    val submissionTime: Long) {
+
+  var completionTime: Option[Long] = None
+
+  val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty
+
+  val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()
+
+  val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty
 
   /**
    * Return whether there are running jobs in this execution.

http://git-wip-us.apache.org/repos/asf/spark/blob/afbe35cf/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 6788c9d..6e60b0e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -23,11 +23,15 @@ import org.mockito.Mockito.mock
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
@@ -386,6 +390,52 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get))
   }
 
+  test("driver side SQL metrics") {
+    val listener = new SQLListener(spark.sparkContext.conf)
+    val expectedAccumValue = 12345
+    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
+    sqlContext.sparkContext.addSparkListener(listener)
+    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
+      override lazy val sparkPlan = physicalPlan
+      override lazy val executedPlan = physicalPlan
+    }
+    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
+      physicalPlan.execute().collect()
+    }
+
+    def waitTillExecutionFinished(): Unit = {
+      while (listener.getCompletedExecutions.isEmpty) {
+        Thread.sleep(100)
+      }
+    }
+    waitTillExecutionFinished()
+
+    val driverUpdates = listener.getCompletedExecutions.head.driverAccumUpdates
+    assert(driverUpdates.size == 1)
+    assert(driverUpdates(physicalPlan.longMetric("dummy").id) == 
expectedAccumValue)
+  }
+
+}
+
+
+/**
+ * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a 
[[SQLMetrics]]
+ * on the driver.
+ */
+private case class MyPlan(sc: SparkContext, expectedValue: Long) extends 
LeafExecNode {
+  override def sparkContext: SparkContext = sc
+  override def output: Seq[Attribute] = Seq()
+
+  override val metrics: Map[String, SQLMetric] = Map(
+    "dummy" -> SQLMetrics.createMetric(sc, "dummy"))
+
+  override def doExecute(): RDD[InternalRow] = {
+    longMetric("dummy") += expectedValue
+    sc.listenerBus.post(SparkListenerDriverAccumUpdates(
+      sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong,
+      metrics.values.map(m => m.id -> m.value).toSeq))
+    sc.emptyRDD
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to