This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 87b20b166c41 [SPARK-47585][SQL] SQL core: Migrate logInfo with variables to structured logging framework 87b20b166c41 is described below commit 87b20b166c41d4c265ac54eed75707b7726d371f Author: panbingkun <panbing...@baidu.com> AuthorDate: Mon Apr 29 22:10:59 2024 -0700 [SPARK-47585][SQL] SQL core: Migrate logInfo with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logInfo` in module `SQL core` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46264 from panbingkun/SPARK-47585. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 55 +++++++++++++++++++--- .../org/apache/spark/ml/util/Instrumentation.scala | 4 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 9 ++-- .../spark/sql/catalyst/rules/RuleExecutor.scala | 6 +-- .../spark/sql/columnar/CachedBatchSerializer.scala | 6 ++- .../spark/sql/execution/DataSourceScanExec.scala | 11 +++-- .../ExternalAppendOnlyUnsafeRowArray.scala | 8 ++-- .../sql/execution/WholeStageCodegenExec.scala | 14 +++--- .../sql/execution/adaptive/AQEOptimizer.scala | 9 ++-- .../aggregate/AggregateCodegenSupport.scala | 10 ++-- .../execution/aggregate/HashAggregateExec.scala | 6 ++- .../aggregate/ObjectAggregationIterator.scala | 13 +++-- .../spark/sql/execution/command/CommandUtils.scala | 9 ++-- .../execution/command/createDataSourceTables.scala | 2 +- .../apache/spark/sql/execution/command/ddl.scala | 30 +++++++----- .../datasources/BasicWriteStatsTracker.scala | 8 ++-- .../sql/execution/datasources/DataSource.scala | 4 +- .../execution/datasources/DataSourceStrategy.scala | 5 +- .../datasources/FileFormatDataWriter.scala | 11 +++-- .../execution/datasources/FileFormatWriter.scala | 9 ++-- .../sql/execution/datasources/FilePartition.scala | 8 ++-- .../sql/execution/datasources/FileScanRDD.scala | 4 +- .../execution/datasources/FileSourceStrategy.scala | 12 ++--- .../execution/datasources/InMemoryFileIndex.scala | 7 +-- .../datasources/PartitioningAwareFileIndex.scala | 7 +-- .../SQLHadoopMapReduceCommitProtocol.scala | 9 ++-- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 5 +- .../execution/datasources/jdbc/JDBCRelation.scala | 7 +-- .../datasources/parquet/ParquetUtils.scala | 7 +-- .../execution/datasources/v2/FileBatchWrite.scala | 9 ++-- .../datasources/v2/FilePartitionReader.scala | 4 +- .../GroupBasedRowLevelOperationScanPlanning.scala | 17 ++++--- .../datasources/v2/V2ScanRelationPushDown.scala | 32 +++++++------ .../datasources/v2/WriteToDataSourceV2Exec.scala | 52 +++++++++++--------- .../python/PythonStreamingSinkCommitRunner.scala | 5 +- .../execution/exchange/EnsureRequirements.scala | 42 ++++++++++------- .../python/PythonStreamingSourceRunner.scala | 5 +- .../spark/sql/execution/r/ArrowRRunner.scala | 24 +++++----- .../WriteToContinuousDataSourceExec.scala | 2 +- .../state/HDFSBackedStateStoreProvider.scala | 16 ++++--- .../apache/spark/sql/internal/SharedState.scala | 15 +++--- .../sql/streaming/StreamingQueryManager.scala | 4 +- 42 files changed, 318 insertions(+), 204 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 2ca80a496ccb..238432d354f6 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -33,6 +33,7 @@ object LogKeys { case object ACCUMULATOR_ID extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey + case object AGGREGATE_FUNCTIONS extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey case object APP_ATTEMPT_ID extends LogKey @@ -43,10 +44,13 @@ object LogKeys { case object ARGS extends LogKey case object BACKUP_FILE extends LogKey case object BATCH_ID extends LogKey + case object BATCH_NAME extends LogKey case object BATCH_TIMESTAMP extends LogKey case object BATCH_WRITE extends LogKey case object BLOCK_ID extends LogKey case object BLOCK_MANAGER_ID extends LogKey + case object BOOT extends LogKey + case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey case object BUCKET extends LogKey case object BYTECODE_SIZE extends LogKey @@ -66,6 +70,7 @@ object LogKeys { case object CHOSEN_WATERMARK extends LogKey case object CLASS_LOADER extends LogKey case object CLASS_NAME extends LogKey + case object CLAUSES extends LogKey case object CLUSTER_CENTROIDS extends LogKey case object CLUSTER_ID extends LogKey case object CLUSTER_LABEL extends LogKey @@ -83,6 +88,7 @@ object LogKeys { case object COMMITTED_VERSION extends LogKey case object COMPACT_INTERVAL extends LogKey case object COMPONENT extends LogKey + case object COMPUTE extends LogKey case object CONFIG extends LogKey case object CONFIG2 extends LogKey case object CONFIG3 extends LogKey @@ -103,6 +109,7 @@ object LogKeys { case object CSV_SCHEMA_FIELD_NAMES extends LogKey case object CSV_SOURCE extends LogKey case object CURRENT_BATCH_ID extends LogKey + case object CURRENT_FILE extends LogKey case object CURRENT_PATH extends LogKey case object DATA extends LogKey case object DATABASE_NAME extends LogKey @@ -111,7 +118,7 @@ object LogKeys { case object DATA_FILE_NUM extends LogKey case object DATA_SOURCE extends LogKey case object DATA_SOURCES extends LogKey - case object DATA_SOURCE_PROVIDER extends LogKey + case object DEFAULT_COMPACTION_INTERVAL extends LogKey case object DEFAULT_COMPACT_INTERVAL extends LogKey case object DEFAULT_ISOLATION_LEVEL extends LogKey case object DEFAULT_VALUE extends LogKey @@ -119,13 +126,14 @@ object LogKeys { case object DELEGATE extends LogKey case object DELTA extends LogKey case object DESCRIPTION extends LogKey - case object DESIRED_PARTITIONS_SIZE extends LogKey + case object DESIRED_NUM_PARTITIONS extends LogKey case object DFS_FILE extends LogKey case object DIFF_DELTA extends LogKey case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey case object DRIVER_ID extends LogKey case object DROPPED_PARTITIONS extends LogKey case object DURATION extends LogKey + case object EARLIEST_LOADED_VERSION extends LogKey case object EFFECTIVE_STORAGE_LEVEL extends LogKey case object ELAPSED_TIME extends LogKey case object ENCODING extends LogKey @@ -136,6 +144,7 @@ object LogKeys { case object EPOCH extends LogKey case object ERROR extends LogKey case object ESTIMATOR_PARAMETER_MAP extends LogKey + case object EVALUATED_FILTERS extends LogKey case object EVENT_LOOP extends LogKey case object EVENT_QUEUE extends LogKey case object EXCEPTION extends LogKey @@ -169,10 +178,13 @@ object LogKeys { case object FILE_END_OFFSET extends LogKey case object FILE_FORMAT extends LogKey case object FILE_FORMAT2 extends LogKey + case object FILE_LENGTH_XATTR extends LogKey case object FILE_MODIFICATION_TIME extends LogKey case object FILE_NAME extends LogKey case object FILE_START_OFFSET extends LogKey case object FILE_VERSION extends LogKey + case object FILTER extends LogKey + case object FILTERS extends LogKey case object FINAL_PATH extends LogKey case object FINISH_TRIGGER_DURATION extends LogKey case object FROM_OFFSET extends LogKey @@ -180,9 +192,11 @@ object LogKeys { case object FUNCTION_NAME extends LogKey case object FUNCTION_PARAMETER extends LogKey case object GLOBAL_WATERMARK extends LogKey + case object GROUP_BY_EXPRS extends LogKey case object GROUP_ID extends LogKey case object HADOOP_VERSION extends LogKey case object HASH_JOIN_KEYS extends LogKey + case object HASH_MAP_SIZE extends LogKey case object HEARTBEAT_INTERVAL extends LogKey case object HISTORY_DIR extends LogKey case object HIVE_CLIENT_VERSION extends LogKey @@ -191,20 +205,23 @@ object LogKeys { case object HIVE_OPERATION_TYPE extends LogKey case object HOST extends LogKey case object HOST_PORT extends LogKey - case object IDENTIFIER extends LogKey + case object HUGE_METHOD_LIMIT extends LogKey case object INCOMPATIBLE_TYPES extends LogKey case object INDEX extends LogKey case object INDEX_FILE_NUM extends LogKey case object INDEX_NAME extends LogKey case object INFERENCE_MODE extends LogKey + case object INIT extends LogKey case object INITIAL_CAPACITY extends LogKey case object INITIAL_HEARTBEAT_INTERVAL extends LogKey case object INIT_MODE extends LogKey + case object INPUT extends LogKey case object INTERVAL extends LogKey case object ISOLATION_LEVEL extends LogKey case object JOB_ID extends LogKey case object JOIN_CONDITION extends LogKey case object JOIN_CONDITION_SUB_EXPR extends LogKey + case object JOIN_TYPE extends LogKey case object K8S_CONTEXT extends LogKey case object KAFKA_PULLS_COUNT extends LogKey case object KAFKA_RECORDS_PULLED_COUNT extends LogKey @@ -218,6 +235,7 @@ object LogKeys { case object LATEST_COMMITTED_BATCH_ID extends LogKey case object LEARNING_RATE extends LogKey case object LEFT_EXPR extends LogKey + case object LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey case object LINE extends LogKey case object LINE_NUM extends LogKey case object LISTENER extends LogKey @@ -239,9 +257,13 @@ object LogKeys { case object MAX_CATEGORIES extends LogKey case object MAX_EXECUTOR_FAILURES extends LogKey case object MAX_FILE_VERSION extends LogKey + case object MAX_JVM_METHOD_PARAMS_LENGTH extends LogKey case object MAX_MEMORY_SIZE extends LogKey - case object MAX_PARTITIONS_SIZE extends LogKey + case object MAX_METHOD_CODE_SIZE extends LogKey + case object MAX_NUM_PARTITIONS extends LogKey + case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey case object MAX_SIZE extends LogKey + case object MAX_SPLIT_BYTES extends LogKey case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey case object MEMORY_SIZE extends LogKey case object MERGE_DIR_NAME extends LogKey @@ -264,10 +286,13 @@ object LogKeys { case object NODES extends LogKey case object NODE_LOCATION extends LogKey case object NORM extends LogKey + case object NUM_ADDED_PARTITIONS extends LogKey case object NUM_BIN extends LogKey case object NUM_BYTES extends LogKey case object NUM_CLASSES extends LogKey case object NUM_COLUMNS extends LogKey + case object NUM_CONCURRENT_WRITER extends LogKey + case object NUM_DROPPED_PARTITIONS extends LogKey case object NUM_EXAMPLES extends LogKey case object NUM_FEATURES extends LogKey case object NUM_FILES extends LogKey @@ -276,31 +301,40 @@ object LogKeys { case object NUM_FILES_REUSED extends LogKey case object NUM_FREQUENT_ITEMS extends LogKey case object NUM_ITERATIONS extends LogKey + case object NUM_LEFT_PARTITION_VALUES extends LogKey + case object NUM_LOADED_ENTRIES extends LogKey case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey - case object NUM_PARTITION extends LogKey + case object NUM_PARTITIONS extends LogKey + case object NUM_PARTITION_VALUES extends LogKey case object NUM_POINT extends LogKey case object NUM_PREFIXES extends LogKey + case object NUM_PRUNED extends LogKey + case object NUM_RIGHT_PARTITION_VALUES extends LogKey case object NUM_SEQUENCES extends LogKey + case object NUM_VERSIONS_RETAIN extends LogKey + case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey case object OBJECT_ID extends LogKey case object OFFSET extends LogKey case object OFFSETS extends LogKey case object OFFSET_SEQUENCE_METADATA extends LogKey case object OLD_BLOCK_MANAGER_ID extends LogKey case object OLD_VALUE extends LogKey + case object OPEN_COST_IN_BYTES extends LogKey case object OPTIMIZED_PLAN_COLUMNS extends LogKey case object OPTIMIZER_CLASS_NAME extends LogKey case object OPTIONS extends LogKey case object OP_ID extends LogKey case object OP_TYPE extends LogKey + case object OUTPUT extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PARSE_MODE extends LogKey case object PARTITIONED_FILE_READER extends LogKey - case object PARTITIONS_SIZE extends LogKey case object PARTITION_ID extends LogKey case object PARTITION_SPECIFICATION extends LogKey case object PARTITION_SPECS extends LogKey case object PATH extends LogKey case object PATHS extends LogKey + case object PERCENT extends LogKey case object PIPELINE_STAGE_UID extends LogKey case object POD_COUNT extends LogKey case object POD_ID extends LogKey @@ -312,12 +346,17 @@ object LogKeys { case object POD_TARGET_COUNT extends LogKey case object POLICY extends LogKey case object PORT extends LogKey + case object POST_SCAN_FILTERS extends LogKey + case object PREDICATE extends LogKey + case object PREDICATES extends LogKey case object PRETTY_ID_STRING extends LogKey case object PRINCIPAL extends LogKey case object PROCESSING_TIME extends LogKey case object PRODUCER_ID extends LogKey case object PROVIDER extends LogKey + case object PUSHED_FILTERS extends LogKey case object PVC_METADATA_NAME extends LogKey + case object PYTHON_EXEC extends LogKey case object QUERY_CACHE_VALUE extends LogKey case object QUERY_HINT extends LogKey case object QUERY_ID extends LogKey @@ -339,6 +378,7 @@ object LogKeys { case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey case object RELATION_NAME extends LogKey + case object RELATION_OUTPUT extends LogKey case object RELATIVE_TOLERANCE extends LogKey case object REMAINING_PARTITIONS extends LogKey case object REPORT_DETAILS extends LogKey @@ -349,11 +389,11 @@ object LogKeys { case object RETRY_COUNT extends LogKey case object RETRY_INTERVAL extends LogKey case object RIGHT_EXPR extends LogKey + case object RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey case object RMSE extends LogKey case object ROCKS_DB_LOG_LEVEL extends LogKey case object ROCKS_DB_LOG_MESSAGE extends LogKey case object RPC_ENDPOINT_REF extends LogKey - case object RULE_BATCH_NAME extends LogKey case object RULE_NAME extends LogKey case object RULE_NUMBER_OF_RUNS extends LogKey case object RUN_ID extends LogKey @@ -450,6 +490,7 @@ object LogKeys { case object URI extends LogKey case object USER_ID extends LogKey case object USER_NAME extends LogKey + case object UUID extends LogKey case object VALUE extends LogKey case object VERSION_NUMBER extends LogKey case object VIRTUAL_CORES extends LogKey diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index acb6bf831239..9413605a31ce 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -28,7 +28,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.{LogEntry, Logging, MDC} -import org.apache.spark.internal.LogKeys.{CLASS_NAME, NUM_PARTITION, PIPELINE_STAGE_UID, STORAGE_LEVEL} +import org.apache.spark.internal.LogKeys.{CLASS_NAME, NUM_PARTITIONS, PIPELINE_STAGE_UID, STORAGE_LEVEL} import org.apache.spark.ml.{MLEvents, PipelineStage} import org.apache.spark.ml.param.{Param, Params} import org.apache.spark.rdd.RDD @@ -67,7 +67,7 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { * Log some data about the dataset being fit. */ def logDataset(dataset: RDD[_]): Unit = { - logInfo(log"training: numPartitions=${MDC(NUM_PARTITION, dataset.partitions.length)}" + + logInfo(log"training: numPartitions=${MDC(NUM_PARTITIONS, dataset.partitions.length)}" + log" storageLevel=${MDC(STORAGE_LEVEL, dataset.getStorageLevel)}") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bdcbe36a3346..dfc1e17c2a29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable import org.apache.spark.SparkException -import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.MDC +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} @@ -446,7 +445,7 @@ abstract class Optimizer(catalogManager: CatalogManager) val excludedRules = excludedRulesConf.filter { ruleName => val nonExcludable = nonExcludableRules.contains(ruleName) if (nonExcludable) { - logWarning(log"Optimization rule '${MDC(RULE_NAME, ruleName)}' " + + logWarning(log"Optimization rule '${MDC(LogKeys.RULE_NAME, ruleName)}' " + log"was not excluded from the optimizer because this rule is a non-excludable rule.") } !nonExcludable @@ -458,7 +457,7 @@ abstract class Optimizer(catalogManager: CatalogManager) val filteredRules = batch.rules.filter { rule => val exclude = excludedRules.contains(rule.ruleName) if (exclude) { - logInfo(log"Optimization rule '${MDC(RULE_NAME, rule.ruleName)}' " + + logInfo(log"Optimization rule '${MDC(LogKeys.RULE_NAME, rule.ruleName)}' " + log"is excluded from the optimizer.") } !exclude @@ -468,7 +467,7 @@ abstract class Optimizer(catalogManager: CatalogManager) } else if (filteredRules.nonEmpty) { Some(Batch(batch.name, batch.strategy, filteredRules: _*)) } else { - logInfo(log"Optimization batch '${MDC(RULE_BATCH_NAME, batch.name)}' " + + logInfo(log"Optimization batch '${MDC(LogKeys.BATCH_NAME, batch.name)}' " + log"is excluded from the optimizer as all enclosed rules have been excluded.") None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 3a5153f46865..c3c105995cd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -75,11 +75,11 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { def message(): MessageWithContext = { if (!oldPlan.fastEquals(newPlan)) { log""" - |=== Result of Batch ${MDC(RULE_BATCH_NAME, batchName)} === + |=== Result of Batch ${MDC(BATCH_NAME, batchName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} """.stripMargin } else { - log"Batch ${MDC(RULE_BATCH_NAME, batchName)} has no effect." + log"Batch ${MDC(BATCH_NAME, batchName)} has no effect." } } @@ -263,7 +263,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { log"to a larger value." } val log = log"Max iterations (${MDC(NUM_ITERATIONS, iteration - 1)}) " + - log"reached for batch ${MDC(RULE_BATCH_NAME, batch.name)}" + + log"reached for batch ${MDC(BATCH_NAME, batch.name)}" + endingMsg if (Utils.isTesting || batch.strategy.errorOnExceed) { throw new RuntimeException(log.message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala index 1113e63cab33..885ddf4110cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.columnar import org.apache.spark.annotation.{DeveloperApi, Since} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{FILTER, PREDICATE} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -307,7 +308,8 @@ abstract class SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer allowFailures = true)) boundFilter.foreach(_ => - filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f"))) + filter.foreach(f => logInfo(log"Predicate ${MDC(PREDICATE, p)} generates " + + log"partition filter: ${MDC(FILTER, f)}"))) // If the filter can't be resolved then we are missing required statistics. boundFilter.filter(_.resolved) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f583bb665de1..2ebbb9664f67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit._ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path +import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, OPEN_COST_IN_BYTES} +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -713,7 +715,7 @@ case class FileSourceScanExec( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: ScanFileListing): RDD[InternalRow] = { - logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") + logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets") val partitionArray = selectedPartitions.toPartitionArray val filesGroupedToBuckets = partitionArray.groupBy { f => BucketingUtils @@ -731,7 +733,7 @@ case class FileSourceScanExec( } val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets => - logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") + logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets") val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) Seq.tabulate(numCoalescedBuckets) { bucketId => val partitionedFiles = coalescedBuckets.get(bucketId).map { @@ -764,8 +766,9 @@ case class FileSourceScanExec( val openCostInBytes = relation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions) - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") + logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " + + log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " + + log"bytes.") // Filter files with bucket pruning if possible val bucketingEnabled = relation.sparkSession.sessionState.conf.bucketingEnabled diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 56289d73c071..59810adc4b22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -22,7 +22,8 @@ import java.io.Closeable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{CLASS_NAME, MAX_NUM_ROWS_IN_MEMORY_BUFFER} import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.serializer.SerializerManager import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -122,8 +123,9 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( inMemoryBuffer += unsafeRow.copy() } else { if (spillableArray == null) { - logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " + - s"${classOf[UnsafeExternalSorter].getName}") + logInfo(log"Reached spill threshold of " + + log"${MDC(MAX_NUM_ROWS_IN_MEMORY_BUFFER, numRowsInMemoryBufferThreshold)} rows, " + + log"switching to ${MDC(CLASS_NAME, classOf[UnsafeExternalSorter].getName)}") // We will not sort the rows, so prefixComparator and recordComparator are null spillableArray = UnsafeExternalSorter.create( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index eb83fe16d808..382f8cf8861a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException, SparkUnsupportedOperationException} -import org.apache.spark.internal.LogKeys.{CODEGEN_STAGE_ID, ERROR, TREE_NODE} +import org.apache.spark.internal.LogKeys.{CODEGEN_STAGE_ID, CONFIG, ERROR, HUGE_METHOD_LIMIT, MAX_METHOD_CODE_SIZE, TREE_NODE} import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -739,11 +739,13 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) // Check if compiled code has a too large function if (compiledCodeStats.maxMethodCodeSize > conf.hugeMethodLimit) { - logInfo(s"Found too long generated codes and JIT optimization might not work: " + - s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above the limit " + - s"${conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + - s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " + - s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") + logInfo(log"Found too long generated codes and JIT optimization might not work: " + + log"the bytecode size (${MDC(MAX_METHOD_CODE_SIZE, compiledCodeStats.maxMethodCodeSize)})" + + log" is above the limit ${MDC(HUGE_METHOD_LIMIT, conf.hugeMethodLimit)}, " + + log"and the whole-stage codegen was disabled for this plan " + + log"(id=${MDC(CODEGEN_STAGE_ID, codegenStageId)}). To avoid this, you can raise the limit" + + log" `${MDC(CONFIG, SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key)}`:\n" + + log"${MDC(TREE_NODE, treeString)}") return child.execute() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index 7bee641a00e7..014d23f2f410 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.internal.LogKeys.{BATCH_NAME, RULE_NAME} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, EliminateLimits, OptimizeOneRowPlan} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity} @@ -52,7 +54,8 @@ class AQEOptimizer(conf: SQLConf, extendedRuntimeOptimizerRules: Seq[Rule[Logica val filteredRules = batch.rules.filter { rule => val exclude = excludedRules.contains(rule.ruleName) if (exclude) { - logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + logInfo(log"Optimization rule '${MDC(RULE_NAME, rule.ruleName)}' is excluded from " + + log"the optimizer.") } !exclude } @@ -61,8 +64,8 @@ class AQEOptimizer(conf: SQLConf, extendedRuntimeOptimizerRules: Seq[Rule[Logica } else if (filteredRules.nonEmpty) { Some(Batch(batch.name, batch.strategy, filteredRules: _*)) } else { - logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + - s"as all enclosed rules have been excluded.") + logInfo(log"Optimization batch '${MDC(BATCH_NAME, batch.name)}' is excluded from " + + log"the optimizer as all enclosed rules have been excluded.") None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala index 9523bf1a1c02..e2d8ac898804 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.SparkException +import org.apache.spark.internal.LogKeys.MAX_JVM_METHOD_PARAMS_LENGTH +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ExpressionEquals, UnsafeRow} @@ -340,11 +342,11 @@ trait AggregateCodegenSupport } Some(splitCodes) } else { - val errMsg = "Failed to split aggregate code into small functions because the parameter " + - "length of at least one split function went over the JVM limit: " + - CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH + val errMsg = log"Failed to split aggregate code into small functions because the " + + log"parameter length of at least one split function went over the JVM limit: " + + log"${MDC(MAX_JVM_METHOD_PARAMS_LENGTH, CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH)}" if (Utils.isTesting) { - throw SparkException.internalError(errMsg) + throw SparkException.internalError(errMsg.message) } else { logInfo(errMsg) None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index bdf17607d77c..8f2b7ca5cba2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit._ import scala.collection.mutable import org.apache.spark.TaskContext +import org.apache.spark.internal.LogKeys.CONFIG +import org.apache.spark.internal.MDC import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -410,8 +412,8 @@ case class HashAggregateExec( private def enableTwoLevelHashMap(): Unit = { if (!checkIfFastHashMapSupported()) { if (!Utils.isTesting) { - logInfo(s"${SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key} is set to true, but" - + " current version of codegened fast hashmap does not support this aggregate.") + logInfo(log"${MDC(CONFIG, SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key)} is set to true, but" + + log" current version of codegened fast hashmap does not support this aggregate.") } } else { isFastHashMapEnabled = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala index 57b8fd8570f2..a4a6dc8e4ab0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.{SparkEnv, SparkException, TaskContext} -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys.{CONFIG, HASH_MAP_SIZE, OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -174,10 +175,12 @@ class ObjectAggregationIterator( // The hash map gets too large, makes a sorted spill and clear the map. if (hashMap.size >= fallbackCountThreshold && inputRows.hasNext) { logInfo( - s"Aggregation hash map size ${hashMap.size} reaches threshold " + - s"capacity ($fallbackCountThreshold entries), spilling and falling back to sort" + - " based aggregation. You may change the threshold by adjust option " + - SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key + log"Aggregation hash map size ${MDC(HASH_MAP_SIZE, hashMap.size)} reaches threshold " + + log"capacity " + + log"(${MDC(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD, fallbackCountThreshold)}" + + log" entries), spilling and falling back to sort based aggregation. You may change " + + log"the threshold by adjust option " + + log"${MDC(CONFIG, SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key)}" ) // Falls back to sort-based aggregation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 2cc09a61e224..d7c5df151bf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{DATABASE_NAME, ERROR, TABLE_NAME} +import org.apache.spark.internal.LogKeys.{COUNT, DATABASE_NAME, ERROR, TABLE_NAME, TIME} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, UnresolvedAttribute} @@ -92,11 +92,12 @@ object CommandUtils extends Logging { } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.") + logInfo(log"Starting to calculate sizes for ${MDC(COUNT, partitions.length)} " + + log"partitions.") calculatePartitionStats(spark, catalogTable, partitions, partitionRowCount) } - logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" + - s" the total size for table ${catalogTable.identifier}.") + logInfo(log"It took ${MDC(TIME, (System.nanoTime() - startTime) / (1000 * 1000))} ms to " + + log"calculate the total size for table ${MDC(TABLE_NAME, catalogTable.identifier)}.") (totalSize, newPartitions) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 9f5c240818fd..539d8346a5ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -233,7 +233,7 @@ case class CreateDataSourceTableAsSelectCommand( } catch { case ex: AnalysisException => logError(log"Failed to write to table " + - log"${MDC(IDENTIFIER, table.identifier.unquotedString)}", ex) + log"${MDC(TABLE_NAME, table.identifier.unquotedString)}", ex) throw ex } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6f36802bef59..cdeb4716e126 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -28,8 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -697,7 +696,7 @@ case class RepairTableCommand( } val root = new Path(table.location) - logInfo(s"Recover all the partitions in $root") + logInfo(log"Recover all the partitions in ${MDC(LogKeys.PATH, root)}") val hadoopConf = spark.sessionState.newHadoopConf() val fs = root.getFileSystem(hadoopConf) @@ -717,14 +716,16 @@ case class RepairTableCommand( evalPool.shutdown() } val total = partitionSpecsAndLocs.length - logInfo(s"Found $total partitions in $root") + logInfo(log"Found ${MDC(LogKeys.NUM_PARTITIONS, total)} partitions " + + log"in ${MDC(LogKeys.PATH, root)}") val partitionStats = if (spark.sessionState.conf.gatherFastStats) { gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) } else { Map.empty[Path, PartitionStatistics] } - logInfo(s"Finished to gather the fast stats for all $total partitions.") + logInfo(log"Finished to gather the fast stats for all " + + log"${MDC(LogKeys.NUM_PARTITIONS, total)} partitions.") addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) total @@ -737,12 +738,14 @@ case class RepairTableCommand( spark.catalog.refreshTable(tableIdentWithDB) } catch { case NonFatal(e) => - logError(log"Cannot refresh the table '${MDC(IDENTIFIER, tableIdentWithDB)}'. " + + logError(log"Cannot refresh the table '${MDC(LogKeys.TABLE_NAME, tableIdentWithDB)}'. " + log"A query of the table might return wrong result if the table was cached. " + log"To avoid such issue, you should uncache the table manually via the UNCACHE TABLE " + log"command after table recovering will complete fully.", e) } - logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).") + logInfo(log"Recovered all partitions: " + + log"added (${MDC(LogKeys.NUM_ADDED_PARTITIONS, addedAmount)}), " + + log"dropped (${MDC(LogKeys.NUM_DROPPED_PARTITIONS, droppedAmount)}).") Seq.empty[Row] } @@ -783,13 +786,13 @@ case class RepairTableCommand( scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), partitionNames.drop(1), threshold, resolver, evalTaskSupport) } else { - logWarning( - log"expected partition column ${MDC(EXPECTED_PARTITION_COLUMN, partitionNames.head)}," + - log" but got ${MDC(ACTUAL_PARTITION_COLUMN, ps(0))}, ignoring it") + logWarning(log"expected partition column " + + log"${MDC(LogKeys.EXPECTED_PARTITION_COLUMN, partitionNames.head)}," + + log" but got ${MDC(LogKeys.ACTUAL_PARTITION_COLUMN, ps(0))}, ignoring it") Seq.empty } } else { - logWarning(log"ignore ${MDC(PATH, new Path(path, name))}") + logWarning(log"ignore ${MDC(LogKeys.PATH, new Path(path, name))}") Seq.empty } } @@ -813,7 +816,8 @@ case class RepairTableCommand( Math.min(spark.sparkContext.defaultParallelism, 10000)) // gather the fast stats for all the partitions otherwise Hive metastore will list all the // files for all the new partitions in sequential way, which is super slow. - logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") + logInfo(log"Gather the fast stats in parallel using ${MDC(LogKeys.COUNT, numParallelism)} " + + log"tasks.") spark.sparkContext.parallelize(locations, numParallelism) .mapPartitions { locationsEachPartition => val pathFilter = getPathFilter(serializableConfiguration.value) @@ -1030,7 +1034,7 @@ object DDLUtils extends Logging { DataSource.lookupDataSource(provider, SQLConf.get).getConstructor().newInstance() } catch { case e: Throwable => - logError(log"Failed to find data source: ${MDC(DATA_SOURCE_PROVIDER, provider)} " + + logError(log"Failed to find data source: ${MDC(LogKeys.DATA_SOURCE, provider)} " + log"when check data column names.", e) return } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index 2f097f78443b..1858a8421359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{ACTUAL_NUM_FILES, EXPECTED_NUM_FILES} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution @@ -120,9 +120,9 @@ class BasicWriteTaskStatsTracker( } catch { case e: NumberFormatException => // warn but don't dump the whole stack - logInfo(s"Failed to parse" + - s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" + - s" bytes written may be under-reported"); + logInfo(log"Failed to parse " + + log"${MDC(LogKeys.FILE_LENGTH_XATTR, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR)}:" + + log"${MDC(LogKeys.ERROR, e)}; bytes written may be under-reported"); case e: UnsupportedOperationException => // this is not unusual; ignore logDebug(s"XAttr not supported on path $path", e); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 4b337798df37..d88b5ee8877d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{CLASS_NAME, DATA_SOURCE_PROVIDER, DATA_SOURCES, PATHS} +import org.apache.spark.internal.LogKeys.{CLASS_NAME, DATA_SOURCE, DATA_SOURCES, PATHS} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} @@ -696,7 +696,7 @@ object DataSource extends Logging { throw QueryCompilationErrors .foundMultipleXMLDataSourceError(provider1, sourceNames, externalSource.getName) } else if (internalSources.size == 1) { - logWarning(log"Multiple sources found for ${MDC(DATA_SOURCE_PROVIDER, provider1)} " + + logWarning(log"Multiple sources found for ${MDC(DATA_SOURCE, provider1)} " + log"(${MDC(DATA_SOURCES, sourceNames.mkString(", "))}), defaulting to the " + log"internal datasource (${MDC(CLASS_NAME, internalSources.head.getClass.getName)}).") internalSources.head.getClass diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 845d969df088..22b60caf2669 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -24,7 +24,8 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.PREDICATES import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName, SQLConfHelper} @@ -494,7 +495,7 @@ object DataSourceStrategy val partitionSet = AttributeSet(partitionColumns) val predicates = ExpressionSet(normalizedFilters .flatMap(extractPredicatesWithinOutputSet(_, partitionSet))) - logInfo(s"Pruning directories with: ${predicates.mkString(",")}") + logInfo(log"Pruning directories with: ${MDC(PREDICATES, predicates.mkString(","))}") predicates } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 1dbb6ce26f69..7d071124b0b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -22,7 +22,8 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TaskOutputFileAlreadyExistException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{CONFIG, NUM_CONCURRENT_WRITER} import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.shuffle.FetchFailedException @@ -558,9 +559,11 @@ class DynamicPartitionDataConcurrentWriter( new WriterStatus(currentWriter, recordsInFile, fileCounter)) if (concurrentWriters.size >= concurrentOutputWriterSpec.maxWriters && !sorted) { // Fall back to sort-based sequential writer mode. - logInfo(s"Number of concurrent writers ${concurrentWriters.size} reaches the threshold. " + - "Fall back from concurrent writers to sort-based sequential writer. You may change " + - s"threshold with configuration ${SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS.key}") + logInfo(log"Number of concurrent writers " + + log"${MDC(NUM_CONCURRENT_WRITER, concurrentWriters.size)} reaches the threshold. " + + log"Fall back from concurrent writers to sort-based sequential writer. You may change " + + log"threshold with configuration " + + log"${MDC(CONFIG, SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS.key)}") sorted = true } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index d842e23cdf40..91749ddd794f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.sql.SparkSession @@ -270,14 +270,15 @@ object FileFormatWriter extends Logging { val ret = f val commitMsgs = ret.map(_.commitMsg) - logInfo(s"Start to commit write Job ${description.uuid}.") + logInfo(log"Start to commit write Job ${MDC(LogKeys.UUID, description.uuid)}.") val (_, duration) = Utils .timeTakenMs { committer.commitJob(job, commitMsgs.toImmutableArraySeq) } - logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") + logInfo(log"Write Job ${MDC(LogKeys.UUID, description.uuid)} committed. " + + log"Elapsed time: ${MDC(LogKeys.ELAPSED_TIME, duration)} ms.") processStats( description.statsTrackers, ret.map(_.summary.stats).toImmutableArraySeq, duration) - logInfo(s"Finished processing stats for write job ${description.uuid}.") + logInfo(log"Finished processing stats for write job ${MDC(LogKeys.UUID, description.uuid)}.") // return a set of all the partition paths that were updated during this job ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala index 10cae0e4966c..8a47a28de845 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala @@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode import org.apache.spark.Partition import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{CONFIG, DESIRED_PARTITIONS_SIZE, MAX_PARTITIONS_SIZE, PARTITIONS_SIZE} +import org.apache.spark.internal.LogKeys.{CONFIG, DESIRED_NUM_PARTITIONS, MAX_NUM_PARTITIONS, NUM_PARTITIONS} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.ScanFileListing @@ -99,10 +99,10 @@ object FilePartition extends Logging { val desiredSplitBytes = (totalSizeInBytes / BigDecimal(maxPartNum.get)).setScale(0, RoundingMode.UP).longValue val desiredPartitions = getFilePartitions(partitionedFiles, desiredSplitBytes, openCostBytes) - logWarning(log"The number of partitions is ${MDC(PARTITIONS_SIZE, partitions.size)}, " + + logWarning(log"The number of partitions is ${MDC(NUM_PARTITIONS, partitions.size)}, " + log"which exceeds the maximum number configured: " + - log"${MDC(MAX_PARTITIONS_SIZE, maxPartNum.get)}. Spark rescales it to " + - log"${MDC(DESIRED_PARTITIONS_SIZE, desiredPartitions.size)} by ignoring the " + + log"${MDC(MAX_NUM_PARTITIONS, maxPartNum.get)}. Spark rescales it to " + + log"${MDC(DESIRED_NUM_PARTITIONS, desiredPartitions.size)} by ignoring the " + log"configuration of ${MDC(CONFIG, SQLConf.FILES_MAX_PARTITION_BYTES.key)}.") desiredPartitions } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 7c1c4fe2f137..9bcdbadf7c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{Partition => RDDPartition, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.LogKeys.PATH +import org.apache.spark.internal.LogKeys.{CURRENT_FILE, PATH} import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.rdd.{InputFileBlockHolder, RDD} @@ -233,7 +233,7 @@ class FileScanRDD( if (files.hasNext) { currentFile = files.next() updateMetadataRow() - logInfo(s"Reading File $currentFile") + logInfo(log"Reading File ${MDC(CURRENT_FILE, currentFile)}") // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder .set(currentFile.urlEncodedPath, currentFile.start, currentFile.length) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index f2dcbe26104f..8333c276cdd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -21,7 +21,8 @@ import java.util.Locale import scala.collection.mutable -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{NUM_PRUNED, POST_SCAN_FILTERS, PUSHED_FILTERS, TOTAL} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions @@ -137,9 +138,8 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { val numBucketsSelected = matchedBuckets.cardinality() - logInfo { - s"Pruned ${numBuckets - numBucketsSelected} out of $numBuckets buckets." - } + logInfo(log"Pruned ${MDC(NUM_PRUNED, numBuckets - numBucketsSelected)} " + + log"out of ${MDC(TOTAL, numBuckets)} buckets.") // None means all the buckets need to be scanned if (numBucketsSelected == numBuckets) { @@ -206,11 +206,11 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { DataSourceUtils.supportNestedPredicatePushdown(fsRelation) val pushedFilters = dataFilters .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) - logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}") + logInfo(log"Pushed Filters: ${MDC(PUSHED_FILTERS, pushedFilters.mkString(","))}") // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty) - logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}") + logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS, afterScanFilters.mkString(","))}") val filterAttributes = AttributeSet(afterScanFilters ++ stayUpFilters) val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 0f66aa816d96..3b8a20c7cf74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -24,7 +24,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{COUNT, ELAPSED_TIME} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FileSourceOptions @@ -136,8 +137,8 @@ class InMemoryFileIndex( fileStatusCache.putLeafFiles(path, leafFiles.toArray) output ++= leafFiles } - logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf files" + - s" for ${paths.length} paths.") + logInfo(log"It took ${MDC(ELAPSED_TIME, (System.nanoTime() - startTime) / (1000 * 1000))} ms" + + log" to list leaf files for ${MDC(COUNT, paths.length)} paths.") output } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 3efe614bcef9..cc9f0d23bcb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -24,7 +24,8 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{COUNT, PERCENT, TOTAL} import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} @@ -190,8 +191,8 @@ abstract class PartitioningAwareFileIndex( val total = partitions.length val selectedSize = selected.length val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 - s"Selected $selectedSize partitions out of $total, " + - s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions." + log"Selected ${MDC(COUNT, selectedSize)} partitions out of ${MDC(TOTAL, total)}, " + + log"pruned ${MDC(PERCENT, if (total == 0) "0" else percentPruned)} partitions." } selected diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 144be2316f09..03e988eb0bd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -21,7 +21,8 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.CLASS_NAME import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf @@ -44,7 +45,8 @@ class SQLHadoopMapReduceCommitProtocol( configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) if (clazz != null) { - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + logInfo(log"Using user defined output committer class " + + log"${MDC(CLASS_NAME, clazz.getCanonicalName)}") // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat // has an associated output committer. To override this output committer, @@ -64,7 +66,8 @@ class SQLHadoopMapReduceCommitProtocol( committer = ctor.newInstance() } } - logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") + logInfo(log"Using output committer class " + + log"${MDC(CLASS_NAME, committer.getClass.getCanonicalName)}") committer } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 8c430e231e39..1b71dc9221f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -23,7 +23,8 @@ import scala.util.Using import scala.util.control.NonFatal import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.SQL_TEXT import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.expressions.filter.Predicate @@ -266,7 +267,7 @@ class JDBCRDD( options.sessionInitStatement match { case Some(sql) => val statement = conn.prepareStatement(sql) - logInfo(s"Executing sessionInitStatement: $sql") + logInfo(log"Executing sessionInitStatement: ${MDC(SQL_TEXT, sql)}") try { statement.setQueryTimeout(options.queryTimeout) statement.execute() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 599cc369d51c..2c4158dfe153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode import org.apache.spark.Partition import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{LOWER_BOUND, NEW_VALUE, OLD_VALUE, UPPER_BOUND} +import org.apache.spark.internal.LogKeys.{CLAUSES, LOWER_BOUND, NEW_VALUE, NUM_PARTITIONS, OLD_VALUE, UPPER_BOUND} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ @@ -164,8 +164,9 @@ private[sql] object JDBCRelation extends Logging { i = i + 1 } val partitions = ans.toArray - logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " + - partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", ")) + val clauses = partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", ") + logInfo(log"Number of partitions: ${MDC(NUM_PARTITIONS, numPartitions)}, " + + log"WHERE clauses of these partitions: ${MDC(CLAUSES, clauses)}") partitions } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 0549d99acc68..3e111252bc6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -434,10 +434,11 @@ object ParquetUtils extends Logging { classOf[OutputCommitter]) if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { - logInfo("Using default output committer for Parquet: " + - classOf[ParquetOutputCommitter].getCanonicalName) + logInfo(log"Using default output committer for Parquet: " + + log"${MDC(CLASS_NAME, classOf[ParquetOutputCommitter].getCanonicalName)}") } else { - logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) + logInfo(log"Using user defined output committer for Parquet: " + + log"${MDC(CLASS_NAME, committerClass.getCanonicalName)}") } conf.setClass( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala index 2f443a0bb1fa..b9f058b55ed0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.hadoop.mapreduce.Job -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} @@ -33,14 +33,15 @@ class FileBatchWrite( extends BatchWrite with Logging { override def commit(messages: Array[WriterCommitMessage]): Unit = { val results = messages.map(_.asInstanceOf[WriteTaskResult]) - logInfo(s"Start to commit write Job ${description.uuid}.") + logInfo(log"Start to commit write Job ${MDC(LogKeys.UUID, description.uuid)}.") val (_, duration) = Utils .timeTakenMs { committer.commitJob(job, results.map(_.commitMsg).toImmutableArraySeq) } - logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") + logInfo(log"Write Job ${MDC(LogKeys.UUID, description.uuid)} committed. " + + log"Elapsed time: ${MDC(LogKeys.ELAPSED_TIME, duration)} ms.") processStats( description.statsTrackers, results.map(_.summary.stats).toImmutableArraySeq, duration) - logInfo(s"Finished processing stats for write job ${description.uuid}.") + logInfo(log"Finished processing stats for write job ${MDC(LogKeys.UUID, description.uuid)}.") } override def useCommitCoordinator(): Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index b71b077f845e..2679f1414456 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import java.io.{FileNotFoundException, IOException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.PARTITIONED_FILE_READER +import org.apache.spark.internal.LogKeys.{CURRENT_FILE, PARTITIONED_FILE_READER} import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.connector.read.PartitionReader @@ -39,7 +39,7 @@ class FilePartitionReader[T]( if (currentReader == null) { if (files.hasNext) { val file = files.next() - logInfo(s"Reading file $file") + logInfo(log"Reading file ${MDC(CURRENT_FILE, file)}") // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder.set(file.urlEncodedPath, file.start, file.length) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala index 87f70eb696b6..8b8cdc06d398 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.{GroupBasedRowLevelOperation, PhysicalOperation} @@ -62,14 +63,16 @@ object GroupBasedRowLevelOperationScanPlanning extends Rule[LogicalPlan] with Pr val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil) + // scalastyle:off line.size.limit logInfo( - s""" - |Pushing operators to ${relation.name} - |Pushed filters: $pushedFiltersStr - |Filters evaluated on data source side: ${evaluatedFilters.mkString(", ")} - |Filters evaluated on Spark side: ${postScanFilters.mkString(", ")} - |Output: ${output.mkString(", ")} - """.stripMargin) + log""" + |Pushing operators to ${MDC(LogKeys.RELATION_NAME, relation.name)} + |Pushed filters: ${MDC(LogKeys.PUSHED_FILTERS, pushedFiltersStr)} + |Filters evaluated on data source side: ${MDC(LogKeys.EVALUATED_FILTERS, evaluatedFilters.mkString(", "))} + |Filters evaluated on Spark side: ${MDC(LogKeys.POST_SCAN_FILTERS, postScanFilters.mkString(", "))}} + |Output: ${MDC(LogKeys.RELATION_OUTPUT, output.mkString(", "))} + """.stripMargin) + // scalastyle:on line.size.limit rd transformDown { // simplify the join condition in MERGE operations by discarding already evaluated filters diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 8c262cf56e8b..2b6fcd9d547f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable +import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, GROUP_BY_EXPRS, POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject @@ -86,11 +88,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery logInfo( - s""" - |Pushing operators to ${sHolder.relation.name} - |Pushed Filters: $pushedFiltersStr - |Post-Scan Filters: ${postScanFilters.mkString(",")} - """.stripMargin) + log""" + |Pushing operators to ${MDC(RELATION_NAME, sHolder.relation.name)} + |Pushed Filters: ${MDC(PUSHED_FILTERS, pushedFiltersStr)} + |Post-Scan Filters: ${MDC(POST_SCAN_FILTERS, postScanFilters.mkString(","))} + """.stripMargin) val filterCondition = postScanFilters.reduceLeftOption(And) filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) @@ -214,13 +216,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { holder.pushedAggOutputMap = AttributeMap(groupOutputMap ++ aggOutputMap) holder.output = newOutput logInfo( - s""" - |Pushing operators to ${holder.relation.name} - |Pushed Aggregate Functions: - | ${translatedAgg.aggregateExpressions().mkString(", ")} - |Pushed Group by: - | ${translatedAgg.groupByExpressions.mkString(", ")} - """.stripMargin) + log""" + |Pushing operators to ${MDC(RELATION_NAME, holder.relation.name)} + |Pushed Aggregate Functions: + | ${MDC(AGGREGATE_FUNCTIONS, translatedAgg.aggregateExpressions().mkString(", "))} + |Pushed Group by: + | ${MDC(GROUP_BY_EXPRS, translatedAgg.groupByExpressions.mkString(", "))} + """.stripMargin) if (canCompletePushDown) { val projectExpressions = finalResultExprs.map { expr => @@ -361,9 +363,9 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { sHolder.builder, sHolder.relation, normalizedProjects, normalizedFilters) logInfo( - s""" - |Output: ${output.mkString(", ")} - """.stripMargin) + log""" + |Output: ${MDC(RELATION_OUTPUT, output.mkString(", "))} + """.stripMargin) val wrappedScan = getWrappedScan(scan, sHolder) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 07ea9ce2bcd6..5632595de7cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkException, TaskContext} -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException @@ -377,8 +376,9 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { val messages = new Array[WriterCommitMessage](rdd.partitions.length) val totalNumRowsAccumulator = new LongAccumulator() - logInfo(s"Start processing data source write support: $batchWrite. " + - s"The input RDD has ${messages.length} partitions.") + logInfo(log"Start processing data source write support: " + + log"${MDC(LogKeys.BATCH_WRITE, batchWrite)}. The input RDD has " + + log"${MDC(LogKeys.COUNT, messages.length)}} partitions.") // Avoid object not serializable issue. val writeMetrics: Map[String, SQLMetric] = customMetrics @@ -397,23 +397,24 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { } ) - logInfo(s"Data source write support $batchWrite is committing.") + logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") batchWrite.commit(messages) - logInfo(s"Data source write support $batchWrite committed.") + logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.") commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { case cause: Throwable => - logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} is aborting.") + logError( + log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is aborting.") try { batchWrite.abort(messages) } catch { case t: Throwable => - logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} " + + logError(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} " + log"failed to abort.") cause.addSuppressed(t) throw QueryExecutionErrors.writingJobFailedError(cause) } - logError(log"Data source write support ${MDC(BATCH_WRITE, batchWrite)} aborted.") + logError(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} aborted.") throw cause } @@ -451,36 +452,45 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serial val coordinator = SparkEnv.get.outputCommitCoordinator val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { - logInfo(s"Commit authorized for partition $partId (task $taskId, attempt $attemptId, " + - s"stage $stageId.$stageAttempt)") + logInfo(log"Commit authorized for partition ${MDC(LogKeys.PARTITION_ID, partId)} " + + log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " + + log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " + + log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." + + log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})") + dataWriter.commit() } else { val commitDeniedException = QueryExecutionErrors.commitDeniedError( partId, taskId, attemptId, stageId, stageAttempt) - logInfo(commitDeniedException.getMessage) + logInfo(log"${MDC(LogKeys.ERROR, commitDeniedException.getMessage)}") // throwing CommitDeniedException will trigger the catch block for abort throw commitDeniedException } } else { - logInfo(s"Writer for partition ${context.partitionId()} is committing.") + logInfo(log"Writer for partition ${MDC(LogKeys.PARTITION_ID, context.partitionId())} " + + log"is committing.") dataWriter.commit() } - logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId, " + - s"stage $stageId.$stageAttempt)") + logInfo(log"Committed partition ${MDC(LogKeys.PARTITION_ID, partId)} " + + log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " + + log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " + + log"stage ${MDC(LogKeys.STAGE_ID, stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})") DataWritingSparkTaskResult(iterWithMetrics.count, msg) })(catchBlock = { // If there is an error, abort this writer - logError(log"Aborting commit for partition ${MDC(PARTITION_ID, partId)} " + - log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, attemptId)}, " + - log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, stageAttempt)})") + logError(log"Aborting commit for partition ${MDC(LogKeys.PARTITION_ID, partId)} " + + log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " + + log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " + + log"stage ${MDC(LogKeys.STAGE_ID, stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})") dataWriter.abort() - logError(log"Aborted commit for partition ${MDC(PARTITION_ID, partId)} " + - log"(task ${MDC(TASK_ID, taskId)}, attempt ${MDC(TASK_ATTEMPT_ID, attemptId)}, " + - log"stage ${MDC(STAGE_ID, stageId)}.${MDC(STAGE_ATTEMPT, stageAttempt)})") + logError(log"Aborted commit for partition ${MDC(LogKeys.PARTITION_ID, partId)} " + + log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " + + log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " + + log"stage ${MDC(LogKeys.STAGE_ID, stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})") }, finallyBlock = { dataWriter.close() }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala index a444fdfff7d9..4178aa6b6058 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala @@ -23,7 +23,8 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.SparkEnv import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.PYTHON_EXEC import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT import org.apache.spark.sql.connector.write.WriterCommitMessage @@ -60,7 +61,7 @@ class PythonStreamingSinkCommitRunner( * Initializes the Python worker for running the streaming sink committer. */ def init(): Unit = { - logInfo(s"Initializing Python runner pythonExec: $pythonExec") + logInfo(log"Initializing Python runner pythonExec: ${MDC(PYTHON_EXEC, pythonExec)}") val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index a0f74ef6c3d0..67d879bdd8bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.exchange import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ @@ -407,11 +408,13 @@ case class EnsureRequirements( val leftPartValues = leftSpec.partitioning.partitionValues val rightPartValues = rightSpec.partitioning.partitionValues + val numLeftPartValues = MDC(LogKeys.NUM_LEFT_PARTITION_VALUES, leftPartValues.size) + val numRightPartValues = MDC(LogKeys.NUM_RIGHT_PARTITION_VALUES, rightPartValues.size) logInfo( - s""" - |Left side # of partitions: ${leftPartValues.size} - |Right side # of partitions: ${rightPartValues.size} - |""".stripMargin) + log""" + |Left side # of partitions: $numLeftPartValues + |Right side # of partitions: $numRightPartValues + |""".stripMargin) // As partition keys are compatible, we can pick either left or right as partition // expressions @@ -421,7 +424,8 @@ case class EnsureRequirements( .mergePartitions(leftSpec.partitioning, rightSpec.partitioning, partitionExprs) .map(v => (v, 1)) - logInfo(s"After merging, there are ${mergedPartValues.size} partitions") + logInfo(log"After merging, there are " + + log"${MDC(LogKeys.NUM_PARTITIONS, mergedPartValues.size)} partitions") var replicateLeftSide = false var replicateRightSide = false @@ -445,8 +449,8 @@ case class EnsureRequirements( val canReplicateRight = canReplicateRightSide(joinType) if (!canReplicateLeft && !canReplicateRight) { - logInfo("Skipping partially clustered distribution as it cannot be applied for " + - s"join type '$joinType'") + logInfo(log"Skipping partially clustered distribution as it cannot be applied for " + + log"join type '${MDC(LogKeys.JOIN_TYPE, joinType)}'") } else { val leftLink = left.logicalLink val rightLink = right.logicalLink @@ -455,12 +459,16 @@ case class EnsureRequirements( leftLink.isDefined && rightLink.isDefined && leftLink.get.stats.sizeInBytes > 1 && rightLink.get.stats.sizeInBytes > 1) { + val leftLinkStatsSizeInBytes = MDC(LogKeys.LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES, + leftLink.get.stats.sizeInBytes) + val rightLinkStatsSizeInBytes = MDC(LogKeys.RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES, + rightLink.get.stats.sizeInBytes) logInfo( - s""" + log""" |Using plan statistics to determine which side of join to fully |cluster partition values: - |Left side size (in bytes): ${leftLink.get.stats.sizeInBytes} - |Right side size (in bytes): ${rightLink.get.stats.sizeInBytes} + |Left side size (in bytes): $leftLinkStatsSizeInBytes + |Right side size (in bytes): $rightLinkStatsSizeInBytes |""".stripMargin) leftLink.get.stats.sizeInBytes < rightLink.get.stats.sizeInBytes } else { @@ -477,12 +485,14 @@ case class EnsureRequirements( // of partitions can be applied. For instance, replication should not be allowed for // the left-hand side of a right outer join. if (replicateLeftSide && !canReplicateLeft) { - logInfo("Left-hand side is picked but cannot be applied to join type " + - s"'$joinType'. Skipping partially clustered distribution.") + logInfo(log"Left-hand side is picked but cannot be applied to join type " + + log"'${MDC(LogKeys.JOIN_TYPE, joinType)}'. Skipping partially clustered " + + log"distribution.") replicateLeftSide = false } else if (replicateRightSide && !canReplicateRight) { - logInfo("Right-hand side is picked but cannot be applied to join type " + - s"'$joinType'. Skipping partially clustered distribution.") + logInfo(log"Right-hand side is picked but cannot be applied to join type " + + log"'${MDC(LogKeys.JOIN_TYPE, joinType)}'. Skipping partially clustered " + + log"distribution.") replicateRightSide = false } else { // In partially clustered distribution, we should use un-grouped partition values @@ -499,8 +509,8 @@ case class EnsureRequirements( InternalRowComparableWrapper(partVal, partitionExprs), numParts)) } - logInfo("After applying partially clustered distribution, there are " + - s"${mergedPartValues.map(_._2).sum} partitions.") + logInfo(log"After applying partially clustered distribution, there are " + + log"${MDC(LogKeys.NUM_PARTITIONS, mergedPartValues.map(_._2).sum)} partitions.") applyPartialClustering = true } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala index dd9c5a25e8a7..2ef046f668b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.SparkEnv import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.PYTHON_EXEC import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -71,7 +72,7 @@ class PythonStreamingSourceRunner( * Initializes the Python worker for running the streaming source. */ def init(): Unit = { - logInfo(s"Initializing Python runner pythonExec: $pythonExec") + logInfo(log"Initializing Python runner pythonExec: ${MDC(PYTHON_EXEC, pythonExec)}") val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala index 819fd1bd297f..45ecf8700950 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala @@ -30,6 +30,7 @@ import org.apache.spark.TaskContext import org.apache.spark.api.r._ import org.apache.spark.api.r.SpecialLengths import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.arrow.ArrowWriter import org.apache.spark.sql.types.StructType @@ -138,6 +139,10 @@ class ArrowRRunner( private var batchLoaded = true + private def format(v: Double): String = { + "%.3f".format(v) + } + protected override def read(): ColumnarBatch = try { if (reader != null && batchLoaded) { batchLoaded = reader.loadNextBatch() @@ -161,17 +166,14 @@ class ArrowRRunner( val input = dataStream.readDouble val compute = dataStream.readDouble val output = dataStream.readDouble - logInfo( - ("Times: boot = %.3f s, init = %.3f s, broadcast = %.3f s, " + - "read-input = %.3f s, compute = %.3f s, write-output = %.3f s, " + - "total = %.3f s").format( - boot, - init, - broadcast, - input, - compute, - output, - boot + init + broadcast + input + compute + output)) + logInfo(log"Times: boot = ${MDC(LogKeys.BOOT, format(boot))} s, " + + log"init = ${MDC(LogKeys.INIT, format(init))} s, " + + log"broadcast = ${MDC(LogKeys.BROADCAST, format(broadcast))} s, " + + log"read-input = ${MDC(LogKeys.INPUT, format(input))} s, " + + log"compute = ${MDC(LogKeys.COMPUTE, format(compute))} s, " + + log"write-output = ${MDC(LogKeys.OUTPUT, format(output))} s, " + + log"total = ${MDC(LogKeys.TOTAL, + format(boot + init + broadcast + input + compute + output))} s") read() case length if length > 0 => // Likewise, there looks no way to send each batch in streaming format via socket diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index 6b9a13ed3eb0..42ce32e1bc67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -49,7 +49,7 @@ case class WriteToContinuousDataSourceExec(write: StreamingWrite, query: SparkPl val rdd = new ContinuousWriteRDD(queryRdd, writerFactory, metrics) logInfo(log"Start processing data source write support: ${MDC(STREAMING_WRITE, write)}. " + - log"The input RDD has ${MDC(NUM_PARTITION, rdd.partitions.length)} partitions.") + log"The input RDD has ${MDC(NUM_PARTITIONS, rdd.partitions.length)} partitions.") EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 853a79d2b6d2..64a9eaad8805 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -446,13 +446,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } if (earliestLoadedVersion.isDefined) { - logInfo(s"Trying to add version=$newVersion to state cache map with " + - s"current_size=$loadedEntries and earliest_loaded_version=${earliestLoadedVersion.get} " + - s"and max_versions_to_retain_in_memory=$numberOfVersionsToRetainInMemory") + logInfo(log"Trying to add version=${MDC(LogKeys.STATE_STORE_VERSION, newVersion)} to state " + + log"cache map with current_size=${MDC(LogKeys.NUM_LOADED_ENTRIES, loadedEntries)} and " + + log"earliest_loaded_version=" + + log"${MDC(LogKeys.EARLIEST_LOADED_VERSION, earliestLoadedVersion.get)}} " + + log"and max_versions_to_retain_in_memory=" + + log"${MDC(LogKeys.NUM_VERSIONS_RETAIN, numberOfVersionsToRetainInMemory)}") } else { - logInfo(s"Trying to add version=$newVersion to state cache map with " + - s"current_size=$loadedEntries and " + - s"max_versions_to_retain_in_memory=$numberOfVersionsToRetainInMemory") + logInfo(log"Trying to add version=${MDC(LogKeys.STATE_STORE_VERSION, newVersion)} to state " + + log"cache map with current_size=${MDC(LogKeys.NUM_LOADED_ENTRIES, loadedEntries)} and " + + log"max_versions_to_retain_in_memory=" + + log"${MDC(LogKeys.NUM_VERSIONS_RETAIN, numberOfVersionsToRetainInMemory)}") } if (numberOfVersionsToRetainInMemory <= 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index cb93ff25da0a..88fd72a3cb7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2} +import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, PATH, VALUE} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.CacheManager @@ -270,8 +270,10 @@ object SharedState extends Logging { if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - logInfo(s"${WAREHOUSE_PATH.key} is not set, but $HIVE_WAREHOUSE_CONF_NAME is set. " + - s"Setting ${WAREHOUSE_PATH.key} to the value of $HIVE_WAREHOUSE_CONF_NAME.") + logInfo(log"${MDC(CONFIG, WAREHOUSE_PATH.key)} is not set, but " + + log"${MDC(CONFIG2, HIVE_WAREHOUSE_CONF_NAME)} is set. " + + log"Setting ${MDC(CONFIG, WAREHOUSE_PATH.key)} to " + + log"the value of ${MDC(CONFIG2, HIVE_WAREHOUSE_CONF_NAME)}.") hiveWarehouseDir } else { // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using @@ -279,8 +281,9 @@ object SharedState extends Logging { // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. val sparkWarehouseDir = sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString) - logInfo(s"Setting $HIVE_WAREHOUSE_CONF_NAME ('$hiveWarehouseDir') to the value of " + - s"${WAREHOUSE_PATH.key}.") + logInfo(log"Setting ${MDC(CONFIG, HIVE_WAREHOUSE_CONF_NAME)} " + + log"('${MDC(VALUE, hiveWarehouseDir)}') to the value of " + + log"${MDC(CONFIG2, WAREHOUSE_PATH.key)}.") sparkWarehouseDir } } @@ -288,7 +291,7 @@ object SharedState extends Logging { def qualifyWarehousePath(hadoopConf: Configuration, warehousePath: String): String = { val tempPath = new Path(warehousePath) val qualified = tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString - logInfo(s"Warehouse path is '$qualified'.") + logInfo(log"Warehouse path is '${MDC(PATH, qualified)}'.") qualified } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 12397468f8ed..55d2e639a56b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Evolving import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{QUERY_ID, RUN_ID} +import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement} @@ -78,7 +78,7 @@ class StreamingQueryManager private[sql] ( Utils.loadExtensions(classOf[StreamingQueryListener], classNames, sparkSession.sparkContext.conf).foreach { listener => addListener(listener) - logInfo(s"Registered listener ${listener.getClass.getName}") + logInfo(log"Registered listener ${MDC(CLASS_NAME, listener.getClass.getName)}") } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org