This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3ea44e5 [SPARK-27639][SQL] InMemoryTableScan shows the table name on UI if possible 3ea44e5 is described below commit 3ea44e52e79b5d02c30332bec50c805c6d56ea76 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Tue May 7 21:00:13 2019 -0700 [SPARK-27639][SQL] InMemoryTableScan shows the table name on UI if possible ## What changes were proposed in this pull request? <img src="https://user-images.githubusercontent.com/5399861/57213799-7bccf100-701a-11e9-9872-d90b4a185dc6.png" width="200"> It only shows `InMemoryTableScan` when scanning InMemoryTable. When there are many InMemoryTables, it is difficult to distinguish which one is what we are looking for. This PR show the table name when scanning InMemoryTable. ## How was this patch tested? unit tests and manual tests After this PR: <img src="https://user-images.githubusercontent.com/5399861/57269120-d3219e80-70b8-11e9-9e56-1b5d4c071660.png" width="200"> Closes #24534 from wangyum/SPARK-27639. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/execution/WholeStageCodegenExec.scala | 2 ++ .../sql/execution/columnar/InMemoryRelation.scala | 7 ++++--- .../execution/columnar/InMemoryTableScanExec.scala | 9 +++++++++ .../sql/execution/metric/SQLMetricsSuite.scala | 22 ++++++++++++++++++++++ .../thriftserver/HiveThriftServer2Suites.scala | 4 ++-- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 027885b1..99dcca8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -50,6 +51,7 @@ trait CodegenSupport extends SparkPlan { case _: SortMergeJoinExec => "smj" case _: RDDScanExec => "rdd" case _: DataSourceScanExec => "scan" + case _: InMemoryTableScanExec => "memoryScan" case _ => nodeName.toLowerCase(Locale.ROOT) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 3edfd8f..16a5094 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -55,6 +55,9 @@ case class CachedRDDBuilder( val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator val rowCountStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator + val cachedName = tableName.map(n => s"In-memory table $n") + .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024)) + def cachedColumnBuffers: RDD[CachedBatch] = { if (_cachedColumnBuffers == null) { synchronized { @@ -130,9 +133,7 @@ case class CachedRDDBuilder( } }.persist(storageLevel) - cached.setName( - tableName.map(n => s"In-memory table $n") - .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))) + cached.setName(cachedName) cached } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index b827878..06634c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -36,6 +36,15 @@ case class InMemoryTableScanExec( @transient relation: InMemoryRelation) extends LeafExecNode with ColumnarBatchScan { + override val nodeName: String = { + relation.cacheBuilder.tableName match { + case Some(_) => + "Scan " + relation.cacheBuilder.cachedName + case _ => + super.nodeName + } + } + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def doCanonicalize(): SparkPlan = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index b77048a..a665fe6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -574,4 +574,26 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared ) } } + + test("InMemoryTableScan shows the table name on UI if possible") { + // Show table name on UI + withView("inMemoryTable", "```a``b```") { + sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1") + sql("CACHE TABLE inMemoryTable") + testSparkPlanMetrics(spark.table("inMemoryTable"), 1, + Map(0L -> (("Scan In-memory table `inMemoryTable`", Map.empty))) + ) + + sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1") + sql("CACHE TABLE ```a``b```") + testSparkPlanMetrics(spark.table("```a``b```"), 1, + Map(0L -> (("Scan In-memory table ```a``b```", Map.empty))) + ) + } + + // Show InMemoryTableScan on UI + testSparkPlanMetrics(spark.range(1).cache().select("id"), 1, + Map(0L -> (("InMemoryTableScan", Map.empty))) + ) + } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 0f53fcd..ef7c500 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -295,7 +295,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select * from test_table") plan.next() plan.next() - assert(plan.getString(1).contains("InMemoryTableScan")) + assert(plan.getString(1).contains("Scan In-memory table `test_table`")) val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") val buf1 = new collection.mutable.ArrayBuffer[Int]() @@ -381,7 +381,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC") plan.next() plan.next() - assert(plan.getString(1).contains("InMemoryTableScan")) + assert(plan.getString(1).contains("Scan In-memory table `test_table`")) val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") val buf = new collection.mutable.ArrayBuffer[Int]() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org