This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new db1c3fe  [SPARK-31045][SQL] Add config for AQE logging level
db1c3fe is described below

commit db1c3feacdca04c1b72191269f4f3910ad05bcb4
Author: maryannxue <maryann...@apache.org>
AuthorDate: Fri Mar 6 11:41:45 2020 +0800

    [SPARK-31045][SQL] Add config for AQE logging level
    
    ### What changes were proposed in this pull request?
    This PR adds an internal config for changing the logging level of adaptive 
execution query plan evolvement.
    
    ### Why are the changes needed?
    To make AQE debugging easier.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Added UT.
    
    Closes #27798 from maryannxue/aqe-log-level.
    
    Authored-by: maryannxue <maryann...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit d705d36c0c94d3a4684de6ca0f444557c3cec25e)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 12 ++++++
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 12 +++++-
 .../adaptive/AdaptiveQueryExecSuite.scala          | 47 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b2b3d12..cd465bc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -378,6 +378,16 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel")
+    .internal()
+    .doc("Configures the log level for adaptive execution logging of plan 
changes. The value " +
+      "can be 'trace', 'debug', 'info', 'warn', or 'error'. The default log 
level is 'debug'.")
+    .version("3.0.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR"))
+    .createWithDefault("debug")
+
   val ADVISORY_PARTITION_SIZE_IN_BYTES =
     buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
       .doc("The advisory size in bytes of the shuffle partition during 
adaptive optimization " +
@@ -2428,6 +2438,8 @@ class SQLConf extends Serializable with Logging {
 
   def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
 
+  def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
+
   def fetchShuffleBlocksInBatch: Boolean = 
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)
 
   def nonEmptyPartitionRatioForBroadcastJoin: Double =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index b74401e..fc88a7f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -66,6 +66,15 @@ case class AdaptiveSparkPlanExec(
 
   @transient private val lock = new Object()
 
+  @transient private val logOnLevel: ( => String) => Unit = 
conf.adaptiveExecutionLogLevel match {
+    case "TRACE" => logTrace(_)
+    case "DEBUG" => logDebug(_)
+    case "INFO" => logInfo(_)
+    case "WARN" => logWarning(_)
+    case "ERROR" => logError(_)
+    case _ => logDebug(_)
+  }
+
   // The logical plan optimizer for re-optimizing the current logical plan.
   @transient private val optimizer = new RuleExecutor[LogicalPlan] {
     // TODO add more optimization rules
@@ -204,6 +213,7 @@ case class AdaptiveSparkPlanExec(
         val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
         if (newCost < origCost ||
             (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
+          logOnLevel(s"Plan changed from $currentPhysicalPlan to 
$newPhysicalPlan")
           cleanUpTempTags(newPhysicalPlan)
           currentPhysicalPlan = newPhysicalPlan
           currentLogicalPlan = newLogicalPlan
@@ -217,7 +227,7 @@ case class AdaptiveSparkPlanExec(
       currentPhysicalPlan = applyPhysicalRules(result.newPlan, 
queryStageOptimizerRules)
       isFinalPlan = true
       executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
-      logDebug(s"Final plan: $currentPhysicalPlan")
+      logOnLevel(s"Final plan: $currentPhysicalPlan")
     }
     currentPhysicalPlan
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index a7fa63d..25b1f89 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.adaptive
 import java.io.File
 import java.net.URI
 
+import org.apache.log4j.Level
+
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, 
SparkPlan}
@@ -729,5 +731,50 @@ class AdaptiveQueryExecSuite
         s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is" +
         s" enabled but is not supported for")))
   }
+
+  test("test log level") {
+    def verifyLog(expectedLevel: Level): Unit = {
+      val logAppender = new LogAppender("adaptive execution")
+      withLogAppender(
+        logAppender,
+        loggerName = Some(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)),
+        level = Some(Level.TRACE)) {
+        withSQLConf(
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+          SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+          sql("SELECT * FROM testData join testData2 ON key = a where value = 
'1'").collect()
+        }
+      }
+      Seq("Plan changed", "Final plan").foreach { msg =>
+        assert(
+          logAppender.loggingEvents.exists { event =>
+            event.getRenderedMessage.contains(msg) && event.getLevel == 
expectedLevel
+          })
+      }
+    }
+
+    // Verify default log level
+    verifyLog(Level.DEBUG)
+
+    // Verify custom log level
+    val levels = Seq(
+      "TRACE" -> Level.TRACE,
+      "trace" -> Level.TRACE,
+      "DEBUG" -> Level.DEBUG,
+      "debug" -> Level.DEBUG,
+      "INFO" -> Level.INFO,
+      "info" -> Level.INFO,
+      "WARN" -> Level.WARN,
+      "warn" -> Level.WARN,
+      "ERROR" -> Level.ERROR,
+      "error" -> Level.ERROR,
+      "deBUG" -> Level.DEBUG)
+
+    levels.foreach { level =>
+      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_LOG_LEVEL.key -> level._1) {
+        verifyLog(level._2)
+      }
+    }
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to