Repository: spark Updated Branches: refs/heads/master 4e8ac6edd -> afbe35cf5
[SPARK-14670] [SQL] allow updating driver side sql metrics ## What changes were proposed in this pull request? On the SparkUI right now we have this SQLTab that displays accumulator values per operator. However, it only displays metrics updated on the executors, not on the driver. It is useful to also include driver metrics, e.g. broadcast time. This is a different version from https://github.com/apache/spark/pull/12427. This PR sends driver side accumulator updates right after the updating happens, not at the end of execution, by a new event. ## How was this patch tested? new test in `SQLListenerSuite` ![qq20160606-0](https://cloud.githubusercontent.com/assets/3182036/15841418/0eb137da-2c06-11e6-9068-5694eeb78530.png) Author: Wenchen Fan <wenc...@databricks.com> Closes #13189 from cloud-fan/metrics. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afbe35cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afbe35cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afbe35cf Branch: refs/heads/master Commit: afbe35cf5b272991b4986e551b42d9201c3862c3 Parents: 4e8ac6e Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Jun 8 22:47:29 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Wed Jun 8 22:47:29 2016 -0700 ---------------------------------------------------------------------- .../exchange/BroadcastExchangeExec.scala | 9 ++++ .../spark/sql/execution/ui/SQLListener.scala | 28 ++++++++-- .../sql/execution/ui/SQLListenerSuite.scala | 56 ++++++++++++++++++-- 3 files changed, 85 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/afbe35cf/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index d3081ba..bd0841d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates import org.apache.spark.util.ThreadUtils /** @@ -92,6 +93,14 @@ case class BroadcastExchangeExec( val broadcasted = sparkContext.broadcast(relation) longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000 + + // There are some cases we don't care about the metrics and call `SparkPlan.doExecute` + // directly without setting an execution id. We should be tolerant to it. + if (executionId != null) { + sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates( + executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq)) + } + broadcasted } }(BroadcastExchangeExec.executionContext) http://git-wip-us.apache.org/repos/asf/spark/blob/afbe35cf/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 03b5326..6e94791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -42,6 +42,10 @@ case class SparkListenerSQLExecutionStart( case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)]) + extends SparkListenerEvent + private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { @@ -251,6 +255,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } } } + case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => synchronized { + _executionIdToData.get(executionId).foreach { executionUIData => + for ((accId, accValue) <- accumUpdates) { + executionUIData.driverAccumUpdates(accId) = accValue + } + } + } case _ => // Ignore } @@ -296,7 +307,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi (accumulatorUpdate._1, accumulatorUpdate._2) } }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } - mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId => + + val driverUpdates = executionUIData.driverAccumUpdates.toSeq + mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId => executionUIData.accumulatorMetrics(accumulatorId).metricType) case None => // This execution has been dropped @@ -368,10 +381,15 @@ private[ui] class SQLExecutionUIData( val physicalPlanDescription: String, val physicalPlanGraph: SparkPlanGraph, val accumulatorMetrics: Map[Long, SQLPlanMetric], - val submissionTime: Long, - var completionTime: Option[Long] = None, - val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty, - val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) { + val submissionTime: Long) { + + var completionTime: Option[Long] = None + + val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty + + val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer() + + val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty /** * Return whether there are running jobs in this execution. http://git-wip-us.apache.org/repos/asf/spark/blob/afbe35cf/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 6788c9d..6e60b0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -23,11 +23,15 @@ import org.mockito.Mockito.mock import org.apache.spark._ import org.apache.spark.executor.TaskMetrics +import org.apache.spark.rdd.RDD import org.apache.spark.scheduler._ -import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.ui.SparkUI import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator} @@ -386,6 +390,52 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get)) } + test("driver side SQL metrics") { + val listener = new SQLListener(spark.sparkContext.conf) + val expectedAccumValue = 12345 + val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue) + sqlContext.sparkContext.addSparkListener(listener) + val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan = physicalPlan + override lazy val executedPlan = physicalPlan + } + SQLExecution.withNewExecutionId(spark, dummyQueryExecution) { + physicalPlan.execute().collect() + } + + def waitTillExecutionFinished(): Unit = { + while (listener.getCompletedExecutions.isEmpty) { + Thread.sleep(100) + } + } + waitTillExecutionFinished() + + val driverUpdates = listener.getCompletedExecutions.head.driverAccumUpdates + assert(driverUpdates.size == 1) + assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue) + } + +} + + +/** + * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a [[SQLMetrics]] + * on the driver. + */ +private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExecNode { + override def sparkContext: SparkContext = sc + override def output: Seq[Attribute] = Seq() + + override val metrics: Map[String, SQLMetric] = Map( + "dummy" -> SQLMetrics.createMetric(sc, "dummy")) + + override def doExecute(): RDD[InternalRow] = { + longMetric("dummy") += expectedValue + sc.listenerBus.post(SparkListenerDriverAccumUpdates( + sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong, + metrics.values.map(m => m.id -> m.value).toSeq)) + sc.emptyRDD + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org