Repository: spark Updated Branches: refs/heads/master 223d83ee9 -> 154351e6d
[SPARK-22462][SQL] Make rdd-based actions in Dataset trackable in SQL UI ## What changes were proposed in this pull request? For the few Dataset actions such as `foreach`, currently no SQL metrics are visible in the SQL tab of SparkUI. It is because it binds wrongly to Dataset's `QueryExecution`. As the actions directly evaluate on the RDD which has individual `QueryExecution`, to show correct SQL metrics on UI, we should bind to RDD's `QueryExecution`. ## How was this patch tested? Manually test. Screenshot is attached in the PR. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #19689 from viirya/SPARK-22462. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/154351e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/154351e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/154351e6 Branch: refs/heads/master Commit: 154351e6dbd24c4254094477e3f7defcba979b1a Parents: 223d83e Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Sat Nov 11 12:34:30 2017 +0100 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sat Nov 11 12:34:30 2017 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/Dataset.scala | 27 +++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/154351e6/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5eb2aff..1620ab3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2594,7 +2594,7 @@ class Dataset[T] private[sql]( * @group action * @since 1.6.0 */ - def foreach(f: T => Unit): Unit = withNewExecutionId { + def foreach(f: T => Unit): Unit = withNewRDDExecutionId { rdd.foreach(f) } @@ -2613,7 +2613,7 @@ class Dataset[T] private[sql]( * @group action * @since 1.6.0 */ - def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId { + def foreachPartition(f: Iterator[T] => Unit): Unit = withNewRDDExecutionId { rdd.foreachPartition(f) } @@ -2851,6 +2851,12 @@ class Dataset[T] private[sql]( */ def unpersist(): this.type = unpersist(blocking = false) + // Represents the `QueryExecution` used to produce the content of the Dataset as an `RDD`. + @transient private lazy val rddQueryExecution: QueryExecution = { + val deserialized = CatalystSerde.deserialize[T](logicalPlan) + sparkSession.sessionState.executePlan(deserialized) + } + /** * Represents the content of the Dataset as an `RDD` of `T`. * @@ -2859,8 +2865,7 @@ class Dataset[T] private[sql]( */ lazy val rdd: RDD[T] = { val objectType = exprEnc.deserializer.dataType - val deserialized = CatalystSerde.deserialize[T](logicalPlan) - sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows => + rddQueryExecution.toRdd.mapPartitions { rows => rows.map(_.get(0, objectType).asInstanceOf[T]) } } @@ -3114,6 +3119,20 @@ class Dataset[T] private[sql]( } /** + * Wrap an action of the Dataset's RDD to track all Spark jobs in the body so that we can connect + * them with an execution. Before performing the action, the metrics of the executed plan will be + * reset. + */ + private def withNewRDDExecutionId[U](body: => U): U = { + SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) { + rddQueryExecution.executedPlan.foreach { plan => + plan.resetMetrics() + } + body + } + } + + /** * Wrap a Dataset action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org