This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f7440f384191 [SPARK-47590][SQL] Hive-thriftserver: Migrate logWarn 
with variables to structured logging framework
f7440f384191 is described below

commit f7440f3841918f2cdb4a8e710cfe31d3fc85230c
Author: Haejoon Lee <haejoon....@databricks.com>
AuthorDate: Tue Apr 16 13:56:03 2024 -0700

    [SPARK-47590][SQL] Hive-thriftserver: Migrate logWarn with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate `logWarning` with variables of 
Hive-thriftserver module to structured logging framework.
    
    ### Why are the changes needed?
    
    To improve the existing logging system by migrating into structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No API changes, but the SQL catalyst logs will contain MDC(Mapped 
Diagnostic Context) from now.
    
    ### How was this patch tested?
    
    Run Scala auto formatting and style check. Also the existing CI should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45923 from itholic/hive-ts-logwarn.
    
    Lead-authored-by: Haejoon Lee <haejoon....@databricks.com>
    Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  1 +
 .../SparkExecuteStatementOperation.scala           |  4 ++-
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  | 15 ++++-----
 .../ui/HiveThriftServer2Listener.scala             | 36 ++++++++++++++++------
 4 files changed, 38 insertions(+), 18 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 41289c641424..bfeb733af30a 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -94,6 +94,7 @@ object LogKey extends Enumeration {
   val FUNCTION_PARAMETER = Value
   val GROUP_ID = Value
   val HADOOP_VERSION = Value
+  val HISTORY_DIR = Value
   val HIVE_OPERATION_STATE = Value
   val HIVE_OPERATION_TYPE = Value
   val HOST = Value
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 628925007f7e..f8f58cd422b6 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -256,7 +256,9 @@ private[hive] class SparkExecuteStatementOperation(
         val currentState = getStatus().getState()
         if (currentState.isTerminal) {
           // This may happen if the execution was cancelled, and then closed 
from another thread.
-          logWarning(s"Ignore exception in terminal state with $statementId: 
$e")
+          logWarning(
+            log"Ignore exception in terminal state with ${MDC(STATEMENT_ID, 
statementId)}", e
+          )
         } else {
           logError(log"Error executing query with ${MDC(STATEMENT_ID, 
statementId)}, " +
             log"currentState ${MDC(HIVE_OPERATION_STATE, currentState)}, ", e)
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 03d8fd0c8ff2..888c086e9042 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -41,7 +41,7 @@ import sun.misc.{Signal, SignalHandler}
 import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkThrowable, 
SparkThrowableHelper}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.ERROR
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.util.SQLKeywordUtils
@@ -232,14 +232,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
         val historyFile = historyDirectory + File.separator + ".hivehistory"
         reader.setHistory(new FileHistory(new File(historyFile)))
       } else {
-        logWarning("WARNING: Directory for Hive history file: " + 
historyDirectory +
-                           " does not exist.   History will not be available 
during this session.")
+        logWarning(
+          log"Directory for Hive history file: ${MDC(HISTORY_DIR, 
historyDirectory)}" +
+            log" does not exist. History will not be available during this 
session.")
       }
     } catch {
       case e: Exception =>
-        logWarning("WARNING: Encountered an error while trying to initialize 
Hive's " +
-                           "history file.  History will not be available 
during this session.")
-        logWarning(e.getMessage)
+        logWarning("Encountered an error while trying to initialize Hive's " +
+                     "history file. History will not be available during this 
session.", e)
     }
 
     // add shutdown hook to flush the history to history file
@@ -250,7 +250,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
             h.flush()
           } catch {
             case e: IOException =>
-              logWarning("WARNING: Failed to write command history file: " + 
e.getMessage)
+              logWarning(
+                log"Failed to write command history file: ${MDC(ERROR, 
e.getMessage)}")
           }
         case _ =>
       }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 31f30f3d97ea..8b7e9b00cb52 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import org.apache.hive.service.server.HiveServer2
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -140,7 +141,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         sessionData.finishTimestamp = e.finishTime
         updateStoreWithTriggerEnabled(sessionData)
         sessionList.remove(e.sessionId)
-      case None => logWarning(s"onSessionClosed called with unknown session 
id: ${e.sessionId}")
+      case None => logWarning(
+        log"onSessionClosed called with unknown session id: ${MDC(SESSION_ID, 
e.sessionId)}"
+      )
     }
 
   private def onOperationStart(e: SparkListenerThriftServerOperationStart): 
Unit = {
@@ -160,8 +163,9 @@ private[thriftserver] class HiveThriftServer2Listener(
       case Some(sessionData) =>
         sessionData.totalExecution += 1
         updateLiveStore(sessionData)
-      case None => logWarning(s"onOperationStart called with unknown session 
id: ${e.sessionId}." +
-        s"Regardless, the operation has been registered.")
+      case None => logWarning(
+        log"onOperationStart called with unknown session id: ${MDC(SESSION_ID, 
e.sessionId)}." +
+        log"Regardless, the operation has been registered.")
     }
   }
 
@@ -171,7 +175,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         executionData.executePlan = e.executionPlan
         executionData.state = ExecutionState.COMPILED
         updateLiveStore(executionData)
-      case None => logWarning(s"onOperationParsed called with unknown 
operation id: ${e.id}")
+      case None => logWarning(
+        log"onOperationParsed called with unknown operation id: 
${MDC(STATEMENT_ID, e.id)}"
+      )
     }
 
   private def onOperationCanceled(e: 
SparkListenerThriftServerOperationCanceled): Unit =
@@ -180,7 +186,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         executionData.finishTimestamp = e.finishTime
         executionData.state = ExecutionState.CANCELED
         updateLiveStore(executionData)
-      case None => logWarning(s"onOperationCanceled called with unknown 
operation id: ${e.id}")
+      case None => logWarning(
+        log"onOperationCanceled called with unknown operation id: 
${MDC(STATEMENT_ID, e.id)}"
+      )
     }
 
   private def onOperationTimeout(e: 
SparkListenerThriftServerOperationTimeout): Unit =
@@ -189,7 +197,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         executionData.finishTimestamp = e.finishTime
         executionData.state = ExecutionState.TIMEDOUT
         updateLiveStore(executionData)
-      case None => logWarning(s"onOperationCanceled called with unknown 
operation id: ${e.id}")
+      case None => logWarning(
+        log"onOperationCanceled called with unknown operation id: 
${MDC(STATEMENT_ID, e.id)}"
+      )
     }
 
   private def onOperationError(e: SparkListenerThriftServerOperationError): 
Unit =
@@ -199,7 +209,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         executionData.detail = e.errorMsg
         executionData.state = ExecutionState.FAILED
         updateLiveStore(executionData)
-      case None => logWarning(s"onOperationError called with unknown operation 
id: ${e.id}")
+      case None => logWarning(
+        log"onOperationError called with unknown operation id: 
${MDC(STATEMENT_ID, e.id)}"
+      )
     }
 
   private def onOperationFinished(e: 
SparkListenerThriftServerOperationFinish): Unit =
@@ -208,7 +220,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         executionData.finishTimestamp = e.finishTime
         executionData.state = ExecutionState.FINISHED
         updateLiveStore(executionData)
-      case None => logWarning(s"onOperationFinished called with unknown 
operation id: ${e.id}")
+      case None => logWarning(
+        log"onOperationFinished called with unknown operation id: 
${MDC(STATEMENT_ID, e.id)}"
+      )
     }
 
   private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): 
Unit =
@@ -218,7 +232,9 @@ private[thriftserver] class HiveThriftServer2Listener(
         executionData.state = ExecutionState.CLOSED
         updateStoreWithTriggerEnabled(executionData)
         executionList.remove(e.id)
-      case None => logWarning(s"onOperationClosed called with unknown 
operation id: ${e.id}")
+      case None => logWarning(
+        log"onOperationClosed called with unknown operation id: 
${MDC(STATEMENT_ID, e.id)}"
+      )
     }
 
   // Update both live and history stores. Trigger is enabled by default, hence


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to