This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new c238455 [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger c238455 is described below commit c2384558086b0386a20aa8098cdf7a4c823a5f04 Author: Eric Wu <492960...@qq.com> AuthorDate: Tue Mar 10 19:08:59 2020 +0800 [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger ### What changes were proposed in this pull request? RuleExecutor already support metering for analyzer/optimizer rules. By providing such information in `PlanChangeLogger`, user can get more information when debugging rule changes . This PR enhanced `PlanChangeLogger` to display RuleExecutor metrics. This can be easily done by calling the existing API `resetMetrics` and `dumpTimeSpent`, but there might be conflicts if user is also collecting total metrics of a sql job. Thus I introduced `QueryExecutionMetrics`, as the snapshot of `QueryExecutionMetering`, to better support this feature. Information added to `PlanChangeLogger` ``` === Metrics of Executed Rules === Total number of runs: 554 Total time: 0.107756568 seconds Total number of effective runs: 11 Total time of effective runs: 0.047615486 seconds ``` ### Why are the changes needed? Provide better plan change debugging user experience ### Does this PR introduce any user-facing change? Only add more debugging info of `planChangeLog`, default log level is TRACE. ### How was this patch tested? Update existing tests to verify the new logs Closes #27846 from Eric5553/ExplainRuleExecMetrics. Authored-by: Eric Wu <492960...@qq.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 15df2a3f40c74cd3950cc48c95c330217e3ef401) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/rules/QueryExecutionMetering.scala | 27 ++++++++++++++++++++++ .../spark/sql/catalyst/rules/RuleExecutor.scala | 24 ++++++++++++++++++- .../catalyst/optimizer/OptimizerLoggingSuite.scala | 9 +++++++- 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala index 875c46d..8efc359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala @@ -37,6 +37,10 @@ case class QueryExecutionMetering() { timeEffectiveRunsMap.clear() } + def getMetrics(): QueryExecutionMetrics = { + QueryExecutionMetrics(totalTime, totalNumRuns, totalNumEffectiveRuns, totalEffectiveTime) + } + def totalTime: Long = { timeMap.sum() } @@ -45,6 +49,14 @@ case class QueryExecutionMetering() { numRunsMap.sum() } + def totalNumEffectiveRuns: Long = { + numEffectiveRunsMap.sum() + } + + def totalEffectiveTime: Long = { + timeEffectiveRunsMap.sum() + } + def incExecutionTimeBy(ruleName: String, delta: Long): Unit = { timeMap.addAndGet(ruleName, delta) } @@ -95,3 +107,18 @@ case class QueryExecutionMetering() { """.stripMargin } } + +case class QueryExecutionMetrics( + time: Long, + numRuns: Long, + numEffectiveRuns: Long, + timeEffective: Long) { + + def -(metrics: QueryExecutionMetrics): QueryExecutionMetrics = { + QueryExecutionMetrics( + this.time - metrics.time, + this.numRuns - metrics.numRuns, + this.numEffectiveRuns - metrics.numEffectiveRuns, + this.timeEffective - metrics.timeEffective) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index da5242b..bff04d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -37,6 +38,10 @@ object RuleExecutor { def resetMetrics(): Unit = { queryExecutionMeter.resetMetrics() } + + def getCurrentMetrics(): QueryExecutionMetrics = { + queryExecutionMeter.getMetrics() + } } abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { @@ -121,6 +126,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = new PlanChangeLogger() val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get + val beforeMetrics = RuleExecutor.getCurrentMetrics() // Run the structural integrity checker against the initial input if (!isPlanIntegral(plan)) { @@ -199,6 +205,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } + planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics) curPlan } @@ -231,7 +238,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { s""" |=== Result of Batch ${batchName} === |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")} - """.stripMargin + """.stripMargin } else { s"Batch ${batchName} has no effect." } @@ -241,6 +248,21 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { } } + def logMetrics(metrics: QueryExecutionMetrics): Unit = { + val totalTime = metrics.time / NANOS_PER_SECOND.toDouble + val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble + val message = + s""" + |=== Metrics of Executed Rules === + |Total number of runs: ${metrics.numRuns} + |Total time: ${totalTime} seconds + |Total number of effective runs: ${metrics.numEffectiveRuns} + |Total time of effective runs: ${totalTimeEffective} seconds + """.stripMargin + + logBasedOnLevel(message) + } + private def logBasedOnLevel(f: => String): Unit = { logLevel match { case "TRACE" => logTrace(f) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala index d3b0a0e..db22121 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala @@ -49,12 +49,19 @@ class OptimizerLoggingSuite extends PlanTest { case event => Seq( "Applying Rule", "Result of Batch", - "has no effect").exists(event.getRenderedMessage().contains) + "has no effect", + "Metrics of Executed Rules").exists(event.getRenderedMessage().contains) } val logMessages = events.map(_.getRenderedMessage) assert(expectedRulesOrBatches.forall (ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch)))) assert(events.forall(_.getLevel == expectedLevel)) + val expectedMetrics = Seq( + "Total number of runs: 7", + "Total time:", + "Total number of effective runs: 3", + "Total time of effective runs:") + assert(expectedMetrics.forall(metrics => logMessages.exists(_.contains(metrics)))) } test("test log level") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org