This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0745333 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor 0745333 is described below commit 074533334d01afdd7862a1ac6c5a7a672bcce3f8 Author: chakravarthiT <45845595+chakravart...@users.noreply.github.com> AuthorDate: Thu Apr 11 10:02:27 2019 +0900 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor ## What changes were proposed in this pull request? Similar to #22406 , which has made log level for plan changes by each rule configurable ,this PR is to make log level for plan changes by each batch configurable,and I have reused the same configuration: "spark.sql.optimizer.planChangeLog.level". Config proposed in this PR , spark.sql.optimizer.planChangeLog.batches - enable plan change logging only for a set of specified batches, separated by commas. ## How was this patch tested? Added UT , also tested manually and attached screenshots below. 1)Setting spark.sql.optimizer.planChangeLog.leve to warn. ![settingLogLevelToWarn](https://user-images.githubusercontent.com/45845595/54556730-8803dd00-49df-11e9-95ab-ebb0c8d735ef.png) 2)setting spark.sql.optimizer.planChangeLog.batches to Resolution and Subquery. ![settingBatchestoLog](https://user-images.githubusercontent.com/45845595/54556740-8cc89100-49df-11e9-80ab-fbbbe1ff2cdf.png) 3) plan change logging enabled only for a set of specified batches(Resolution and Subquery) ![batchloggingOp](https://user-images.githubusercontent.com/45845595/54556788-ab2e8c80-49df-11e9-9ae0-57815f552896.png) Closes #24136 from chakravarthiT/logBatches. Lead-authored-by: chakravarthiT <45845595+chakravart...@users.noreply.github.com> Co-authored-by: chakravarthiT <tccha...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../spark/sql/catalyst/rules/RuleExecutor.scala | 55 ++++++++++++++-------- .../org/apache/spark/sql/internal/SQLConf.scala | 18 +++++-- .../catalyst/optimizer/OptimizerLoggingSuite.scala | 45 +++++++++++++++--- 3 files changed, 87 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 088f1fe..3e8a6e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (effective) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) - planChangeLogger.log(rule.ruleName, plan, result) + planChangeLogger.logRule(rule.ruleName, plan, result) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) @@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { lastPlan = curPlan } - if (!batchStartPlan.fastEquals(curPlan)) { - logDebug( - s""" - |=== Result of Batch ${batch.name} === - |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} - """.stripMargin) - } else { - logTrace(s"Batch ${batch.name} has no effect.") - } + planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } curPlan @@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq) - def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { + private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq) + + def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { if (logRules.isEmpty || logRules.get.contains(ruleName)) { - lazy val message = + def message(): String = { s""" |=== Applying Rule ${ruleName} === |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")} """.stripMargin - logLevel match { - case "TRACE" => logTrace(message) - case "DEBUG" => logDebug(message) - case "INFO" => logInfo(message) - case "WARN" => logWarning(message) - case "ERROR" => logError(message) - case _ => logTrace(message) } + + logBasedOnLevel(message) + } + } + + def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { + if (logBatches.isEmpty || logBatches.get.contains(batchName)) { + def message(): String = { + if (!oldPlan.fastEquals(newPlan)) { + s""" + |=== Result of Batch ${batchName} === + |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")} + """.stripMargin + } else { + s"Batch ${batchName} has no effect." + } + } + + logBasedOnLevel(message) + } + } + + private def logBasedOnLevel(f: => String): Unit = { + logLevel match { + case "TRACE" => logTrace(f) + case "DEBUG" => logDebug(f) + case "INFO" => logInfo(f) + case "WARN" => logWarning(f) + case "ERROR" => logError(f) + case _ => logTrace(f) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 157be1b..f33cc86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -184,8 +184,8 @@ object SQLConf { val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level") .internal() .doc("Configures the log level for logging the change from the original plan to the new " + - "plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " + - "'error'. The default log level is 'trace'.") + "plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " + + "'warn', or 'error'. The default log level is 'trace'.") .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), @@ -195,9 +195,15 @@ object SQLConf { val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules") .internal() - .doc("If this configuration is set, the optimizer will only log plan changes caused by " + - "applying the rules specified in this configuration. The value can be a list of rule " + - "names separated by comma.") + .doc("Configures a list of rules to be logged in the optimizer, in which the rules are " + + "specified by their rule names and separated by comma.") + .stringConf + .createOptional + + val OPTIMIZER_PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.optimizer.planChangeLog.batches") + .internal() + .doc("Configures a list of batches to be logged in the optimizer, in which the batches " + + "are specified by their batch names and separated by comma.") .stringConf .createOptional @@ -1763,6 +1769,8 @@ class SQLConf extends Serializable with Logging { def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES) + def optimizerPlanChangeBatches: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_BATCHES) + def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS) def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala index 3e9b453..dd7e29d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala @@ -32,17 +32,20 @@ import org.apache.spark.sql.internal.SQLConf class OptimizerLoggingSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("Optimizer Batch", FixedPoint(100), - PushDownPredicate, - ColumnPruning, - CollapseProject) :: Nil + val batches = + Batch("Optimizer Batch", FixedPoint(100), + PushDownPredicate, ColumnPruning, CollapseProject) :: + Batch("Batch Has No Effect", Once, + ColumnPruning) :: Nil } class MockAppender extends AppenderSkeleton { val loggingEvents = new ArrayBuffer[LoggingEvent]() override def append(loggingEvent: LoggingEvent): Unit = { - if (loggingEvent.getRenderedMessage().contains("Applying Rule")) { + if (loggingEvent.getRenderedMessage().contains("Applying Rule") || + loggingEvent.getRenderedMessage().contains("Result of Batch") || + loggingEvent.getRenderedMessage().contains("has no effect")) { loggingEvents.append(loggingEvent) } } @@ -51,7 +54,18 @@ class OptimizerLoggingSuite extends PlanTest { override def requiresLayout(): Boolean = false } - private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = { + private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = { + val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1)) + val restoreLevel = logger.getLevel + logger.setLevel(level) + logger.addAppender(appender) + try f finally { + logger.setLevel(restoreLevel) + logger.removeAppender(appender) + } + } + + private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = { val logAppender = new MockAppender() withLogAppender(logAppender, loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) { @@ -61,7 +75,8 @@ class OptimizerLoggingSuite extends PlanTest { comparePlans(Optimize.execute(query), expected) } val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage) - assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule)))) + assert(expectedRulesOrBatches.forall + (ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch)))) assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel)) } @@ -135,4 +150,20 @@ class OptimizerLoggingSuite extends PlanTest { } } } + + test("test log batches which change the plan") { + withSQLConf( + SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Optimizer Batch", + SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") { + verifyLog(Level.INFO, Seq("Optimizer Batch")) + } + } + + test("test log batches which do not change the plan") { + withSQLConf( + SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Batch Has No Effect", + SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") { + verifyLog(Level.INFO, Seq("Batch Has No Effect")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org