This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ea44e5  [SPARK-27639][SQL] InMemoryTableScan shows the table name on 
UI if possible
3ea44e5 is described below

commit 3ea44e52e79b5d02c30332bec50c805c6d56ea76
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Tue May 7 21:00:13 2019 -0700

    [SPARK-27639][SQL] InMemoryTableScan shows the table name on UI if possible
    
    ## What changes were proposed in this pull request?
    <img 
src="https://user-images.githubusercontent.com/5399861/57213799-7bccf100-701a-11e9-9872-d90b4a185dc6.png";
 width="200">
    
    It only shows `InMemoryTableScan` when scanning InMemoryTable.
    When there are many InMemoryTables, it is difficult to distinguish which 
one is what we are looking for. This PR show the table name when scanning 
InMemoryTable.
    
    ## How was this patch tested?
    
    unit tests and manual tests
    
    After this PR:
    <img 
src="https://user-images.githubusercontent.com/5399861/57269120-d3219e80-70b8-11e9-9e56-1b5d4c071660.png";
 width="200">
    
    Closes #24534 from wangyum/SPARK-27639.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/WholeStageCodegenExec.scala      |  2 ++
 .../sql/execution/columnar/InMemoryRelation.scala  |  7 ++++---
 .../execution/columnar/InMemoryTableScanExec.scala |  9 +++++++++
 .../sql/execution/metric/SQLMetricsSuite.scala     | 22 ++++++++++++++++++++++
 .../thriftserver/HiveThriftServer2Suites.scala     |  4 ++--
 5 files changed, 39 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 027885b1..99dcca8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -32,6 +32,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
@@ -50,6 +51,7 @@ trait CodegenSupport extends SparkPlan {
     case _: SortMergeJoinExec => "smj"
     case _: RDDScanExec => "rdd"
     case _: DataSourceScanExec => "scan"
+    case _: InMemoryTableScanExec => "memoryScan"
     case _ => nodeName.toLowerCase(Locale.ROOT)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 3edfd8f..16a5094 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -55,6 +55,9 @@ case class CachedRDDBuilder(
   val sizeInBytesStats: LongAccumulator = 
cachedPlan.sqlContext.sparkContext.longAccumulator
   val rowCountStats: LongAccumulator = 
cachedPlan.sqlContext.sparkContext.longAccumulator
 
+  val cachedName = tableName.map(n => s"In-memory table $n")
+    .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))
+
   def cachedColumnBuffers: RDD[CachedBatch] = {
     if (_cachedColumnBuffers == null) {
       synchronized {
@@ -130,9 +133,7 @@ case class CachedRDDBuilder(
       }
     }.persist(storageLevel)
 
-    cached.setName(
-      tableName.map(n => s"In-memory table $n")
-        .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024)))
+    cached.setName(cachedName)
     cached
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index b827878..06634c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -36,6 +36,15 @@ case class InMemoryTableScanExec(
     @transient relation: InMemoryRelation)
   extends LeafExecNode with ColumnarBatchScan {
 
+  override val nodeName: String = {
+    relation.cacheBuilder.tableName match {
+      case Some(_) =>
+        "Scan " + relation.cacheBuilder.cachedName
+      case _ =>
+        super.nodeName
+    }
+  }
+
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ 
super.innerChildren
 
   override def doCanonicalize(): SparkPlan =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index b77048a..a665fe6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -574,4 +574,26 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       )
     }
   }
+
+  test("InMemoryTableScan shows the table name on UI if possible") {
+    // Show table name on UI
+    withView("inMemoryTable", "```a``b```") {
+      sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1")
+      sql("CACHE TABLE inMemoryTable")
+      testSparkPlanMetrics(spark.table("inMemoryTable"), 1,
+        Map(0L -> (("Scan In-memory table `inMemoryTable`", Map.empty)))
+      )
+
+      sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1")
+      sql("CACHE TABLE ```a``b```")
+      testSparkPlanMetrics(spark.table("```a``b```"), 1,
+        Map(0L -> (("Scan In-memory table ```a``b```", Map.empty)))
+      )
+    }
+
+    // Show InMemoryTableScan on UI
+    testSparkPlanMetrics(spark.range(1).cache().select("id"), 1,
+      Map(0L -> (("InMemoryTableScan", Map.empty)))
+    )
+  }
 }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 0f53fcd..ef7c500 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -295,7 +295,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
         val plan = statement.executeQuery("explain select * from test_table")
         plan.next()
         plan.next()
-        assert(plan.getString(1).contains("InMemoryTableScan"))
+        assert(plan.getString(1).contains("Scan In-memory table `test_table`"))
 
         val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY 
KEY DESC")
         val buf1 = new collection.mutable.ArrayBuffer[Int]()
@@ -381,7 +381,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
         val plan = statement.executeQuery("explain select key from test_map 
ORDER BY key DESC")
         plan.next()
         plan.next()
-        assert(plan.getString(1).contains("InMemoryTableScan"))
+        assert(plan.getString(1).contains("Scan In-memory table `test_table`"))
 
         val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY 
DESC")
         val buf = new collection.mutable.ArrayBuffer[Int]()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to