This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new db1c3fe [SPARK-31045][SQL] Add config for AQE logging level db1c3fe is described below commit db1c3feacdca04c1b72191269f4f3910ad05bcb4 Author: maryannxue <maryann...@apache.org> AuthorDate: Fri Mar 6 11:41:45 2020 +0800 [SPARK-31045][SQL] Add config for AQE logging level ### What changes were proposed in this pull request? This PR adds an internal config for changing the logging level of adaptive execution query plan evolvement. ### Why are the changes needed? To make AQE debugging easier. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #27798 from maryannxue/aqe-log-level. Authored-by: maryannxue <maryann...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit d705d36c0c94d3a4684de6ca0f444557c3cec25e) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++ .../execution/adaptive/AdaptiveSparkPlanExec.scala | 12 +++++- .../adaptive/AdaptiveQueryExecSuite.scala | 47 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b2b3d12..cd465bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -378,6 +378,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel") + .internal() + .doc("Configures the log level for adaptive execution logging of plan changes. The value " + + "can be 'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 'debug'.") + .version("3.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR")) + .createWithDefault("debug") + val ADVISORY_PARTITION_SIZE_IN_BYTES = buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes") .doc("The advisory size in bytes of the shuffle partition during adaptive optimization " + @@ -2428,6 +2438,8 @@ class SQLConf extends Serializable with Logging { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) + def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL) + def fetchShuffleBlocksInBatch: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH) def nonEmptyPartitionRatioForBroadcastJoin: Double = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index b74401e..fc88a7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -66,6 +66,15 @@ case class AdaptiveSparkPlanExec( @transient private val lock = new Object() + @transient private val logOnLevel: ( => String) => Unit = conf.adaptiveExecutionLogLevel match { + case "TRACE" => logTrace(_) + case "DEBUG" => logDebug(_) + case "INFO" => logInfo(_) + case "WARN" => logWarning(_) + case "ERROR" => logError(_) + case _ => logDebug(_) + } + // The logical plan optimizer for re-optimizing the current logical plan. @transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules @@ -204,6 +213,7 @@ case class AdaptiveSparkPlanExec( val newCost = costEvaluator.evaluateCost(newPhysicalPlan) if (newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { + logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan @@ -217,7 +227,7 @@ case class AdaptiveSparkPlanExec( currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) - logDebug(s"Final plan: $currentPhysicalPlan") + logOnLevel(s"Final plan: $currentPhysicalPlan") } currentPhysicalPlan } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index a7fa63d..25b1f89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.adaptive import java.io.File import java.net.URI +import org.apache.log4j.Level + import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.QueryTest import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} @@ -729,5 +731,50 @@ class AdaptiveQueryExecSuite s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is" + s" enabled but is not supported for"))) } + + test("test log level") { + def verifyLog(expectedLevel: Level): Unit = { + val logAppender = new LogAppender("adaptive execution") + withLogAppender( + logAppender, + loggerName = Some(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)), + level = Some(Level.TRACE)) { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + sql("SELECT * FROM testData join testData2 ON key = a where value = '1'").collect() + } + } + Seq("Plan changed", "Final plan").foreach { msg => + assert( + logAppender.loggingEvents.exists { event => + event.getRenderedMessage.contains(msg) && event.getLevel == expectedLevel + }) + } + } + + // Verify default log level + verifyLog(Level.DEBUG) + + // Verify custom log level + val levels = Seq( + "TRACE" -> Level.TRACE, + "trace" -> Level.TRACE, + "DEBUG" -> Level.DEBUG, + "debug" -> Level.DEBUG, + "INFO" -> Level.INFO, + "info" -> Level.INFO, + "WARN" -> Level.WARN, + "warn" -> Level.WARN, + "ERROR" -> Level.ERROR, + "error" -> Level.ERROR, + "deBUG" -> Level.DEBUG) + + levels.foreach { level => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_LOG_LEVEL.key -> level._1) { + verifyLog(level._2) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org