This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f8e652e88320 [SPARK-47589][SQL] Hive-Thriftserver: Migrate logError with variables to structured logging framework f8e652e88320 is described below commit f8e652e88320528a70e605a6a3cf986725e153a5 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Mon Apr 8 17:13:28 2024 -0700 [SPARK-47589][SQL] Hive-Thriftserver: Migrate logError with variables to structured logging framework ### What changes were proposed in this pull request? Migrate logError with variables of Hive-thriftserver module to the structured logging framework. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #45936 from gengliangwang/LogError_HiveThriftServer2. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../main/scala/org/apache/spark/internal/LogKey.scala | 3 +++ .../thriftserver/SparkExecuteStatementOperation.scala | 17 +++++++++++------ .../spark/sql/hive/thriftserver/SparkOperation.scala | 6 ++++-- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 5 +++-- .../spark/sql/hive/thriftserver/SparkSQLDriver.scala | 5 +++-- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 1144887e0b47..a0e99f1edc34 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -50,6 +50,8 @@ object LogKey extends Enumeration { val EXIT_CODE = Value val FAILURES = Value val GROUP_ID = Value + val HIVE_OPERATION_STATE = Value + val HIVE_OPERATION_TYPE = Value val HOST = Value val JOB_ID = Value val JOIN_CONDITION = Value @@ -96,6 +98,7 @@ object LogKey extends Enumeration { val SIZE = Value val SLEEP_TIME = Value val STAGE_ID = Value + val STATEMENT_ID = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value val TASK_ATTEMPT_ID = Value diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 77b2aa131a24..628925007f7e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -30,9 +30,11 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, TTypeQualifiers, TTypeQualifierValue} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HIVE_OPERATION_STATE, STATEMENT_ID, TIMEOUT, USER_NAME} import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND import org.apache.spark.sql.execution.HiveResult.getTimeFormatters import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types._ @@ -142,7 +144,9 @@ private[hive] class SparkExecuteStatementOperation( } catch { case NonFatal(e) => setOperationException(new HiveSQLException(e)) - logError(s"Error cancelling the query after timeout: $timeout seconds") + val timeout_ms = timeout * MILLIS_PER_SECOND + logError( + log"Error cancelling the query after timeout: ${MDC(TIMEOUT, timeout_ms)} ms") } finally { timeoutExecutor.shutdown() } @@ -178,8 +182,8 @@ private[hive] class SparkExecuteStatementOperation( } catch { case e: Exception => setOperationException(new HiveSQLException(e)) - logError("Error running hive query as user : " + - sparkServiceUGI.getShortUserName(), e) + logError(log"Error running hive query as user : " + + log"${MDC(USER_NAME, sparkServiceUGI.getShortUserName())}", e) } } } @@ -196,7 +200,7 @@ private[hive] class SparkExecuteStatementOperation( statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) throw HiveThriftServerErrors.taskExecutionRejectedError(rejected) case NonFatal(e) => - logError(s"Error executing query in background", e) + logError("Error executing query in background", e) setState(OperationState.ERROR) HiveThriftServer2.eventManager.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) @@ -254,7 +258,8 @@ private[hive] class SparkExecuteStatementOperation( // This may happen if the execution was cancelled, and then closed from another thread. logWarning(s"Ignore exception in terminal state with $statementId: $e") } else { - logError(s"Error executing query with $statementId, currentState $currentState, ", e) + logError(log"Error executing query with ${MDC(STATEMENT_ID, statementId)}, " + + log"currentState ${MDC(HIVE_OPERATION_STATE, currentState)}, ", e) setState(OperationState.ERROR) HiveThriftServer2.eventManager.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index bbc4bd9d5326..d5874fe77665 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -21,7 +21,8 @@ import org.apache.hive.service.cli.{HiveSQLException, OperationState} import org.apache.hive.service.cli.operation.Operation import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HIVE_OPERATION_TYPE, STATEMENT_ID} import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER import org.apache.spark.sql.catalyst.catalog.CatalogTableType @@ -98,7 +99,8 @@ private[hive] trait SparkOperation extends Operation with Logging { protected def onError(): PartialFunction[Throwable, Unit] = { case e: Throwable => - logError(s"Error operating $getType with $statementId", e) + logError(log"Error operating ${MDC(HIVE_OPERATION_TYPE, getType)} with " + + log"${MDC(STATEMENT_ID, statementId)}", e) super.setState(OperationState.ERROR) HiveThriftServer2.eventManager.onStatementError( statementId, e.getMessage, Utils.exceptionString(e)) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index d41b06e74cfd..03d8fd0c8ff2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -40,7 +40,8 @@ import sun.misc.{Signal, SignalHandler} import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkThrowable, SparkThrowableHelper} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.util.SQLKeywordUtils @@ -214,7 +215,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { } } catch { case e: FileNotFoundException => - logError(s"Could not open input file for reading. (${e.getMessage})") + logError(log"Could not open input file for reading. (${MDC(ERROR, e.getMessage)})") exit(ERROR_PATH_NOT_FOUND) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 5d9ec3051dc3..29e468aaa9fe 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.SparkThrowable -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.COMMAND import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.plans.logical.CommandResult import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} @@ -83,7 +84,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont logDebug(s"Failed in [$command]", st) new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st) case cause: Throwable => - logError(s"Failed in [$command]", cause) + logError(log"Failed in [${MDC(COMMAND, command)}]", cause) new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org