Repository: spark
Updated Branches:
  refs/heads/master 6bbdf34ba -> 07a700b37


[SPARK-26129][SQL] Instrumentation for per-query planning time

## What changes were proposed in this pull request?
We currently don't have good visibility into query planning time (analysis vs 
optimization vs physical planning). This patch adds a simple utility to track 
the runtime of various rules and various planning phases.

## How was this patch tested?
Added unit tests and end-to-end integration tests.

Closes #23096 from rxin/SPARK-26129.

Authored-by: Reynold Xin <r...@databricks.com>
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07a700b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07a700b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07a700b3

Branch: refs/heads/master
Commit: 07a700b3711057553dfbb7b047216565726509c7
Parents: 6bbdf34
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Nov 21 16:41:12 2018 +0100
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Nov 21 16:41:12 2018 +0100

----------------------------------------------------------------------
 .../sql/catalyst/QueryPlanningTracker.scala     | 127 +++++++++++++++++++
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  22 ++--
 .../spark/sql/catalyst/rules/RuleExecutor.scala |  19 ++-
 .../catalyst/QueryPlanningTrackerSuite.scala    |  78 ++++++++++++
 .../sql/catalyst/analysis/AnalysisTest.scala    |   3 +-
 .../ResolveGroupingAnalyticsSuite.scala         |   3 +-
 .../analysis/ResolvedUuidExpressionsSuite.scala |  10 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |   9 ++
 .../org/apache/spark/sql/SparkSession.scala     |   6 +-
 .../spark/sql/execution/QueryExecution.scala    |  21 ++-
 .../QueryPlanningTrackerEndToEndSuite.scala     |  52 ++++++++
 .../apache/spark/sql/hive/test/TestHive.scala   |  16 ++-
 12 files changed, 338 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
new file mode 100644
index 0000000..420f2a1
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.util.BoundedPriorityQueue
+
+
+/**
+ * A simple utility for tracking runtime and associated stats in query 
planning.
+ *
+ * There are two separate concepts we track:
+ *
+ * 1. Phases: These are broad scope phases in query planning, as listed below, 
i.e. analysis,
+ * optimizationm and physical planning (just planning).
+ *
+ * 2. Rules: These are the individual Catalyst rules that we track. In 
addition to time, we also
+ * track the number of invocations and effective invocations.
+ */
+object QueryPlanningTracker {
+
+  // Define a list of common phases here.
+  val PARSING = "parsing"
+  val ANALYSIS = "analysis"
+  val OPTIMIZATION = "optimization"
+  val PLANNING = "planning"
+
+  class RuleSummary(
+    var totalTimeNs: Long, var numInvocations: Long, var 
numEffectiveInvocations: Long) {
+
+    def this() = this(totalTimeNs = 0, numInvocations = 0, 
numEffectiveInvocations = 0)
+
+    override def toString: String = {
+      s"RuleSummary($totalTimeNs, $numInvocations, $numEffectiveInvocations)"
+    }
+  }
+
+  /**
+   * A thread local variable to implicitly pass the tracker around. This 
assumes the query planner
+   * is single-threaded, and avoids passing the same tracker context in every 
function call.
+   */
+  private val localTracker = new ThreadLocal[QueryPlanningTracker]() {
+    override def initialValue: QueryPlanningTracker = null
+  }
+
+  /** Returns the current tracker in scope, based on the thread local 
variable. */
+  def get: Option[QueryPlanningTracker] = Option(localTracker.get())
+
+  /** Sets the current tracker for the execution of function f. We assume f is 
single-threaded. */
+  def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = {
+    val originalTracker = localTracker.get()
+    localTracker.set(tracker)
+    try f finally { localTracker.set(originalTracker) }
+  }
+}
+
+
+class QueryPlanningTracker {
+
+  import QueryPlanningTracker._
+
+  // Mapping from the name of a rule to a rule's summary.
+  // Use a Java HashMap for less overhead.
+  private val rulesMap = new java.util.HashMap[String, RuleSummary]
+
+  // From a phase to time in ns.
+  private val phaseToTimeNs = new java.util.HashMap[String, Long]
+
+  /** Measure the runtime of function f, and add it to the time for the 
specified phase. */
+  def measureTime[T](phase: String)(f: => T): T = {
+    val startTime = System.nanoTime()
+    val ret = f
+    val timeTaken = System.nanoTime() - startTime
+    phaseToTimeNs.put(phase, phaseToTimeNs.getOrDefault(phase, 0) + timeTaken)
+    ret
+  }
+
+  /**
+   * Record a specific invocation of a rule.
+   *
+   * @param rule name of the rule
+   * @param timeNs time taken to run this invocation
+   * @param effective whether the invocation has resulted in a plan change
+   */
+  def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): 
Unit = {
+    var s = rulesMap.get(rule)
+    if (s eq null) {
+      s = new RuleSummary
+      rulesMap.put(rule, s)
+    }
+
+    s.totalTimeNs += timeNs
+    s.numInvocations += 1
+    s.numEffectiveInvocations += (if (effective) 1 else 0)
+  }
+
+  // ------------ reporting functions below ------------
+
+  def rules: Map[String, RuleSummary] = rulesMap.asScala.toMap
+
+  def phases: Map[String, Long] = phaseToTimeNs.asScala.toMap
+
+  /** Returns the top k most expensive rules (as measured by time). */
+  def topRulesByTime(k: Int): Seq[(String, RuleSummary)] = {
+    val orderingByTime: Ordering[(String, RuleSummary)] = Ordering.by(e => 
e._2.totalTimeNs)
+    val q = new BoundedPriorityQueue(k)(orderingByTime)
+    rulesMap.asScala.foreach(q.+=)
+    q.toSeq.sortBy(r => -r._2.totalTimeNs)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ab2312f..b977fa0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -102,16 +102,18 @@ class Analyzer(
     this(catalog, conf, conf.optimizerMaxIterations)
   }
 
-  def executeAndCheck(plan: LogicalPlan): LogicalPlan = 
AnalysisHelper.markInAnalyzer {
-    val analyzed = execute(plan)
-    try {
-      checkAnalysis(analyzed)
-      analyzed
-    } catch {
-      case e: AnalysisException =>
-        val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Option(analyzed))
-        ae.setStackTrace(e.getStackTrace)
-        throw ae
+  def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): 
LogicalPlan = {
+    AnalysisHelper.markInAnalyzer {
+      val analyzed = executeAndTrack(plan, tracker)
+      try {
+        checkAnalysis(analyzed)
+        analyzed
+      } catch {
+        case e: AnalysisException =>
+          val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Option(analyzed))
+          ae.setStackTrace(e.getStackTrace)
+          throw ae
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index e991a2d..cf6ff4f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.rules
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.catalyst.util.sideBySide
@@ -67,6 +68,17 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
   protected def isPlanIntegral(plan: TreeType): Boolean = true
 
   /**
+   * Executes the batches of rules defined by the subclass, and also tracks 
timing info for each
+   * rule using the provided tracker.
+   * @see [[execute]]
+   */
+  def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType 
= {
+    QueryPlanningTracker.withTracker(tracker) {
+      execute(plan)
+    }
+  }
+
+  /**
    * Executes the batches of rules defined by the subclass. The batches are 
executed serially
    * using the defined execution strategy. Within each batch, rules are also 
executed serially.
    */
@@ -74,6 +86,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
     var curPlan = plan
     val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
     val planChangeLogger = new PlanChangeLogger()
+    val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
 
     batches.foreach { batch =>
       val batchStartPlan = curPlan
@@ -88,8 +101,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
             val startTime = System.nanoTime()
             val result = rule(plan)
             val runTime = System.nanoTime() - startTime
+            val effective = !result.fastEquals(plan)
 
-            if (!result.fastEquals(plan)) {
+            if (effective) {
               queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
               queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, 
runTime)
               planChangeLogger.log(rule.ruleName, plan, result)
@@ -97,6 +111,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
             queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
             queryExecutionMetrics.incNumExecution(rule.ruleName)
 
+            // Record timing information using QueryPlanningTracker
+            tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, 
effective))
+
             // Run the structural integrity checker against the plan after 
each rule.
             if (!isPlanIntegral(result)) {
               val message = s"After applying rule ${rule.ruleName} in batch 
${batch.name}, " +

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
new file mode 100644
index 0000000..f42c262
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.SparkFunSuite
+
+class QueryPlanningTrackerSuite extends SparkFunSuite {
+
+  test("phases") {
+    val t = new QueryPlanningTracker
+    t.measureTime("p1") {
+      Thread.sleep(1)
+    }
+
+    assert(t.phases("p1") > 0)
+    assert(!t.phases.contains("p2"))
+
+    val old = t.phases("p1")
+
+    t.measureTime("p1") {
+      Thread.sleep(1)
+    }
+    assert(t.phases("p1") > old)
+  }
+
+  test("rules") {
+    val t = new QueryPlanningTracker
+    t.recordRuleInvocation("r1", 1, effective = false)
+    t.recordRuleInvocation("r2", 2, effective = true)
+    t.recordRuleInvocation("r3", 1, effective = false)
+    t.recordRuleInvocation("r3", 2, effective = true)
+
+    val rules = t.rules
+
+    assert(rules("r1").totalTimeNs == 1)
+    assert(rules("r1").numInvocations == 1)
+    assert(rules("r1").numEffectiveInvocations == 0)
+
+    assert(rules("r2").totalTimeNs == 2)
+    assert(rules("r2").numInvocations == 1)
+    assert(rules("r2").numEffectiveInvocations == 1)
+
+    assert(rules("r3").totalTimeNs == 3)
+    assert(rules("r3").numInvocations == 2)
+    assert(rules("r3").numEffectiveInvocations == 1)
+  }
+
+  test("topRulesByTime") {
+    val t = new QueryPlanningTracker
+    t.recordRuleInvocation("r2", 2, effective = true)
+    t.recordRuleInvocation("r4", 4, effective = true)
+    t.recordRuleInvocation("r1", 1, effective = false)
+    t.recordRuleInvocation("r3", 3, effective = false)
+
+    val top = t.topRulesByTime(2)
+    assert(top.size == 2)
+    assert(top(0)._1 == "r4")
+    assert(top(1)._1 == "r3")
+
+    // Don't crash when k > total size
+    assert(t.topRulesByTime(10).size == 4)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 3d7c918..fab1b77 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -21,6 +21,7 @@ import java.net.URI
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -54,7 +55,7 @@ trait AnalysisTest extends PlanTest {
       expectedPlan: LogicalPlan,
       caseSensitive: Boolean = true): Unit = {
     val analyzer = getAnalyzer(caseSensitive)
-    val actualPlan = analyzer.executeAndCheck(inputPlan)
+    val actualPlan = analyzer.executeAndCheck(inputPlan, new 
QueryPlanningTracker)
     comparePlans(actualPlan, expectedPlan)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
index 8da4d7e..aa5eda8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.TimeZone
 
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -109,7 +110,7 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest {
       Seq(UnresolvedAlias(Multiply(unresolved_a, Literal(2))),
         unresolved_b, UnresolvedAlias(count(unresolved_c))))
 
-    val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2)
+    val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2, new 
QueryPlanningTracker)
     val gExpressions = resultPlan.asInstanceOf[Aggregate].groupingExpressions
     assert(gExpressions.size == 3)
     val firstGroupingExprAttrName =

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
index fe57c19..64bd075 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -34,6 +35,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest {
   private lazy val uuid3 = Uuid().as('_uuid3)
   private lazy val uuid1Ref = uuid1.toAttribute
 
+  private val tracker = new QueryPlanningTracker
   private val analyzer = getAnalyzer(caseSensitive = true)
 
   private def getUuidExpressions(plan: LogicalPlan): Seq[Uuid] = {
@@ -47,7 +49,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest {
 
   test("analyzed plan sets random seed for Uuid expression") {
     val plan = r.select(a, uuid1)
-    val resolvedPlan = analyzer.executeAndCheck(plan)
+    val resolvedPlan = analyzer.executeAndCheck(plan, tracker)
     getUuidExpressions(resolvedPlan).foreach { u =>
       assert(u.resolved)
       assert(u.randomSeed.isDefined)
@@ -56,14 +58,14 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest {
 
   test("Uuid expressions should have different random seeds") {
     val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3)
-    val resolvedPlan = analyzer.executeAndCheck(plan)
+    val resolvedPlan = analyzer.executeAndCheck(plan, tracker)
     
assert(getUuidExpressions(resolvedPlan).map(_.randomSeed.get).distinct.length 
== 3)
   }
 
   test("Different analyzed plans should have different random seeds in Uuids") 
{
     val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3)
-    val resolvedPlan1 = analyzer.executeAndCheck(plan)
-    val resolvedPlan2 = analyzer.executeAndCheck(plan)
+    val resolvedPlan1 = analyzer.executeAndCheck(plan, tracker)
+    val resolvedPlan2 = analyzer.executeAndCheck(plan, tracker)
     val uuids1 = getUuidExpressions(resolvedPlan1)
     val uuids2 = getUuidExpressions(resolvedPlan2)
     assert(uuids1.distinct.length == 3)

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0e77ec0..e757921 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -32,6 +32,7 @@ import org.apache.spark.api.java.function._
 import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
@@ -76,6 +77,14 @@ private[sql] object Dataset {
     qe.assertAnalyzed()
     new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
   }
+
+  /** A variant of ofRows that allows passing in a tracker so we can track 
query parsing time. */
+  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: 
QueryPlanningTracker)
+    : DataFrame = {
+    val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
+    qe.assertAnalyzed()
+    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 725db97..739c6b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -648,7 +648,11 @@ class SparkSession private(
    * @since 2.0.0
    */
   def sql(sqlText: String): DataFrame = {
-    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
+    val tracker = new QueryPlanningTracker
+    val plan = tracker.measureTime(QueryPlanningTracker.PARSING) {
+      sessionState.sqlParser.parsePlan(sqlText)
+    }
+    Dataset.ofRows(self, plan, tracker)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 905d035..87a4ceb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -43,7 +43,10 @@ import org.apache.spark.util.Utils
  * While this is not a public class, we should avoid changing the function 
names for the sake of
  * changing them, because a lot of developers use the feature for debugging.
  */
-class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) 
{
+class QueryExecution(
+    val sparkSession: SparkSession,
+    val logical: LogicalPlan,
+    val tracker: QueryPlanningTracker = new QueryPlanningTracker) {
 
   // TODO: Move the planner an optimizer into here from SessionState.
   protected def planner = sparkSession.sessionState.planner
@@ -56,9 +59,9 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
     }
   }
 
-  lazy val analyzed: LogicalPlan = {
+  lazy val analyzed: LogicalPlan = 
tracker.measureTime(QueryPlanningTracker.ANALYSIS) {
     SparkSession.setActiveSession(sparkSession)
-    sparkSession.sessionState.analyzer.executeAndCheck(logical)
+    sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
   }
 
   lazy val withCachedData: LogicalPlan = {
@@ -67,9 +70,11 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
     sparkSession.sharedState.cacheManager.useCachedData(analyzed)
   }
 
-  lazy val optimizedPlan: LogicalPlan = 
sparkSession.sessionState.optimizer.execute(withCachedData)
+  lazy val optimizedPlan: LogicalPlan = 
tracker.measureTime(QueryPlanningTracker.OPTIMIZATION) {
+    sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, 
tracker)
+  }
 
-  lazy val sparkPlan: SparkPlan = {
+  lazy val sparkPlan: SparkPlan = 
tracker.measureTime(QueryPlanningTracker.PLANNING) {
     SparkSession.setActiveSession(sparkSession)
     // TODO: We use next(), i.e. take the first plan returned by the planner, 
here for now,
     //       but we will implement to choose the best plan.
@@ -78,7 +83,9 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
 
   // executedPlan should not be used to initialize any SparkPlan. It should be
   // only used for execution.
-  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
+  lazy val executedPlan: SparkPlan = 
tracker.measureTime(QueryPlanningTracker.PLANNING) {
+    prepareForExecution(sparkPlan)
+  }
 
   /** Internal version of the RDD. Avoids copies and has no schema */
   lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
new file mode 100644
index 0000000..0af4c85
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext {
+
+  test("programmatic API") {
+    val df = spark.range(1000).selectExpr("count(*)")
+    df.collect()
+    val tracker = df.queryExecution.tracker
+
+    assert(tracker.phases.size == 3)
+    assert(tracker.phases("analysis") > 0)
+    assert(tracker.phases("optimization") > 0)
+    assert(tracker.phases("planning") > 0)
+
+    assert(tracker.rules.nonEmpty)
+  }
+
+  test("sql") {
+    val df = spark.sql("select * from range(1)")
+    df.collect()
+
+    val tracker = df.queryExecution.tracker
+
+    assert(tracker.phases.size == 4)
+    assert(tracker.phases("parsing") > 0)
+    assert(tracker.phases("analysis") > 0)
+    assert(tracker.phases("optimization") > 0)
+    assert(tracker.phases("planning") > 0)
+
+    assert(tracker.rules.nonEmpty)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/07a700b3/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 634b3db..3508aff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -33,9 +33,9 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, 
ExternalCatalogWithListener}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
OneRowRelation}
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
@@ -219,6 +219,16 @@ private[hive] class TestHiveSparkSession(
     
sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.newSession()
   }
 
+  /**
+   * This is a temporary hack to override SparkSession.sql so we can still use 
the version of
+   * Dataset.ofRows that creates a TestHiveQueryExecution (rather than a 
normal QueryExecution
+   * which wouldn't load all the test tables).
+   */
+  override def sql(sqlText: String): DataFrame = {
+    val plan = sessionState.sqlParser.parsePlan(sqlText)
+    Dataset.ofRows(self, plan)
+  }
+
   override def newSession(): TestHiveSparkSession = {
     new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
   }
@@ -586,7 +596,7 @@ private[hive] class TestHiveQueryExecution(
     logDebug(s"Query references test tables: 
${referencedTestTables.mkString(", ")}")
     referencedTestTables.foreach(sparkSession.loadTestTable)
     // Proceed with analysis.
-    sparkSession.sessionState.analyzer.executeAndCheck(logical)
+    sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to