This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8e652e88320 [SPARK-47589][SQL] Hive-Thriftserver: Migrate logError 
with variables to structured logging framework
f8e652e88320 is described below

commit f8e652e88320528a70e605a6a3cf986725e153a5
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Mon Apr 8 17:13:28 2024 -0700

    [SPARK-47589][SQL] Hive-Thriftserver: Migrate logError with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    
    Migrate logError with variables of Hive-thriftserver module to the 
structured logging framework.
    
    ### Why are the changes needed?
    
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    Existing UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45936 from gengliangwang/LogError_HiveThriftServer2.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../main/scala/org/apache/spark/internal/LogKey.scala   |  3 +++
 .../thriftserver/SparkExecuteStatementOperation.scala   | 17 +++++++++++------
 .../spark/sql/hive/thriftserver/SparkOperation.scala    |  6 ++++--
 .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala |  5 +++--
 .../spark/sql/hive/thriftserver/SparkSQLDriver.scala    |  5 +++--
 5 files changed, 24 insertions(+), 12 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 1144887e0b47..a0e99f1edc34 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -50,6 +50,8 @@ object LogKey extends Enumeration {
   val EXIT_CODE = Value
   val FAILURES = Value
   val GROUP_ID = Value
+  val HIVE_OPERATION_STATE = Value
+  val HIVE_OPERATION_TYPE = Value
   val HOST = Value
   val JOB_ID = Value
   val JOIN_CONDITION = Value
@@ -96,6 +98,7 @@ object LogKey extends Enumeration {
   val SIZE = Value
   val SLEEP_TIME = Value
   val STAGE_ID = Value
+  val STATEMENT_ID = Value
   val SUBMISSION_ID = Value
   val SUBSAMPLING_RATE = Value
   val TASK_ATTEMPT_ID = Value
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 77b2aa131a24..628925007f7e 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -30,9 +30,11 @@ import 
org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, 
TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, 
TTypeQualifiers, TTypeQualifierValue}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{HIVE_OPERATION_STATE, STATEMENT_ID, 
TIMEOUT, USER_NAME}
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
 import org.apache.spark.sql.execution.HiveResult.getTimeFormatters
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types._
@@ -142,7 +144,9 @@ private[hive] class SparkExecuteStatementOperation(
           } catch {
             case NonFatal(e) =>
               setOperationException(new HiveSQLException(e))
-              logError(s"Error cancelling the query after timeout: $timeout 
seconds")
+              val timeout_ms = timeout * MILLIS_PER_SECOND
+              logError(
+                log"Error cancelling the query after timeout: ${MDC(TIMEOUT, 
timeout_ms)} ms")
           } finally {
             timeoutExecutor.shutdown()
           }
@@ -178,8 +182,8 @@ private[hive] class SparkExecuteStatementOperation(
           } catch {
             case e: Exception =>
               setOperationException(new HiveSQLException(e))
-              logError("Error running hive query as user : " +
-                sparkServiceUGI.getShortUserName(), e)
+              logError(log"Error running hive query as user : " +
+                log"${MDC(USER_NAME, sparkServiceUGI.getShortUserName())}", e)
           }
         }
       }
@@ -196,7 +200,7 @@ private[hive] class SparkExecuteStatementOperation(
             statementId, rejected.getMessage, 
SparkUtils.exceptionString(rejected))
           throw HiveThriftServerErrors.taskExecutionRejectedError(rejected)
         case NonFatal(e) =>
-          logError(s"Error executing query in background", e)
+          logError("Error executing query in background", e)
           setState(OperationState.ERROR)
           HiveThriftServer2.eventManager.onStatementError(
             statementId, e.getMessage, SparkUtils.exceptionString(e))
@@ -254,7 +258,8 @@ private[hive] class SparkExecuteStatementOperation(
           // This may happen if the execution was cancelled, and then closed 
from another thread.
           logWarning(s"Ignore exception in terminal state with $statementId: 
$e")
         } else {
-          logError(s"Error executing query with $statementId, currentState 
$currentState, ", e)
+          logError(log"Error executing query with ${MDC(STATEMENT_ID, 
statementId)}, " +
+            log"currentState ${MDC(HIVE_OPERATION_STATE, currentState)}, ", e)
           setState(OperationState.ERROR)
           HiveThriftServer2.eventManager.onStatementError(
             statementId, e.getMessage, SparkUtils.exceptionString(e))
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
index bbc4bd9d5326..d5874fe77665 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
@@ -21,7 +21,8 @@ import org.apache.hive.service.cli.{HiveSQLException, 
OperationState}
 import org.apache.hive.service.cli.operation.Operation
 
 import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{HIVE_OPERATION_TYPE, STATEMENT_ID}
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -98,7 +99,8 @@ private[hive] trait SparkOperation extends Operation with 
Logging {
 
   protected def onError(): PartialFunction[Throwable, Unit] = {
     case e: Throwable =>
-      logError(s"Error operating $getType with $statementId", e)
+      logError(log"Error operating ${MDC(HIVE_OPERATION_TYPE, getType)} with " 
+
+        log"${MDC(STATEMENT_ID, statementId)}", e)
       super.setState(OperationState.ERROR)
       HiveThriftServer2.eventManager.onStatementError(
         statementId, e.getMessage, Utils.exceptionString(e))
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index d41b06e74cfd..03d8fd0c8ff2 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -40,7 +40,8 @@ import sun.misc.{Signal, SignalHandler}
 
 import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkThrowable, 
SparkThrowableHelper}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.ERROR
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.util.SQLKeywordUtils
@@ -214,7 +215,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
       }
     } catch {
       case e: FileNotFoundException =>
-        logError(s"Could not open input file for reading. (${e.getMessage})")
+        logError(log"Could not open input file for reading. (${MDC(ERROR, 
e.getMessage)})")
         exit(ERROR_PATH_NOT_FOUND)
     }
 
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 5d9ec3051dc3..29e468aaa9fe 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
 
 import org.apache.spark.SparkThrowable
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.COMMAND
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.plans.logical.CommandResult
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
@@ -83,7 +84,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = 
SparkSQLEnv.sqlCont
           logDebug(s"Failed in [$command]", st)
           new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), 
st.getSqlState, st)
         case cause: Throwable =>
-          logError(s"Failed in [$command]", cause)
+          logError(log"Failed in [${MDC(COMMAND, command)}]", cause)
           new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), 
null, cause)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to