This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d540786d9cea [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to structured logging framework d540786d9cea is described below commit d540786d9ceacd7426803ad615f7ab32ec6faf67 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Thu Apr 25 13:23:21 2024 -0700 [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to structured logging framework ### What changes were proposed in this pull request? Migrate logInfo with variables of the streaming module to structured logging framework. This transforms the logInfo entries of the following API ``` def logInfo(msg: => String): Unit ``` to ``` def logInfo(entry: LogEntry): Unit ``` ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? Yes, Spark core logs will contain additional MDC ### How was this patch tested? Compiler and scala style checks, as well as code review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46192 from dtenedor/streaming-log-info. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 47 ++++++++++++- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../AsyncProgressTrackingMicroBatchExecution.scala | 5 +- .../streaming/CheckpointFileManager.scala | 8 ++- .../streaming/CompactibleFileStreamLog.scala | 11 ++-- .../sql/execution/streaming/FileStreamSink.scala | 4 +- .../execution/streaming/FileStreamSinkLog.scala | 4 +- .../sql/execution/streaming/FileStreamSource.scala | 13 ++-- .../sql/execution/streaming/HDFSMetadataLog.scala | 7 +- .../execution/streaming/IncrementalExecution.scala | 4 +- .../streaming/ManifestFileCommitProtocol.scala | 4 +- .../execution/streaming/MetadataLogFileIndex.scala | 4 +- .../execution/streaming/MicroBatchExecution.scala | 34 ++++++---- .../sql/execution/streaming/ProgressReporter.scala | 8 +-- .../execution/streaming/ResolveWriteToStream.scala | 5 +- .../sql/execution/streaming/StreamExecution.scala | 7 +- .../sql/execution/streaming/WatermarkTracker.scala | 7 +- .../streaming/continuous/ContinuousExecution.scala | 13 ++-- .../continuous/ContinuousQueuedDataReader.scala | 6 +- .../streaming/continuous/ContinuousWriteRDD.scala | 10 +-- .../WriteToContinuousDataSourceExec.scala | 7 +- .../sources/RateStreamMicroBatchStream.scala | 5 +- .../state/HDFSBackedStateStoreProvider.scala | 34 ++++++---- .../sql/execution/streaming/state/RocksDB.scala | 54 ++++++++------- .../streaming/state/RocksDBFileManager.scala | 77 +++++++++++++--------- .../streaming/state/RocksDBMemoryManager.scala | 7 +- .../state/RocksDBStateStoreProvider.scala | 12 ++-- .../sql/execution/streaming/state/StateStore.scala | 23 ++++--- .../streaming/state/StateStoreChangelog.scala | 8 ++- .../state/StreamingSessionWindowStateManager.scala | 5 +- .../state/SymmetricHashJoinStateManager.scala | 6 +- 31 files changed, 286 insertions(+), 155 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index fab5e80dd0e6..6df7cb5a5867 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -34,6 +34,7 @@ object LogKey extends Enumeration { val ARGS = Value val BACKUP_FILE = Value val BATCH_ID = Value + val BATCH_TIMESTAMP = Value val BATCH_WRITE = Value val BLOCK_ID = Value val BLOCK_MANAGER_ID = Value @@ -48,8 +49,12 @@ object LogKey extends Enumeration { val CATALOG_NAME = Value val CATEGORICAL_FEATURES = Value val CHECKPOINT_FILE = Value + val CHECKPOINT_LOCATION = Value + val CHECKPOINT_PATH = Value + val CHECKPOINT_ROOT = Value val CHECKPOINT_TIME = Value val CHECKSUM_FILE_NUM = Value + val CHOSEN_WATERMARK = Value val CLASS_LOADER = Value val CLASS_NAME = Value val CLUSTER_CENTROIDS = Value @@ -66,6 +71,8 @@ object LogKey extends Enumeration { val COLUMN_NAME = Value val COMMAND = Value val COMMAND_OUTPUT = Value + val COMMITTED_VERSION = Value + val COMPACT_INTERVAL = Value val COMPONENT = Value val CONFIG = Value val CONFIG2 = Value @@ -86,6 +93,7 @@ object LogKey extends Enumeration { val CSV_SCHEMA_FIELD_NAME = Value val CSV_SCHEMA_FIELD_NAMES = Value val CSV_SOURCE = Value + val CURRENT_BATCH_ID = Value val CURRENT_PATH = Value val DATA = Value val DATABASE_NAME = Value @@ -95,6 +103,7 @@ object LogKey extends Enumeration { val DATA_SOURCE = Value val DATA_SOURCES = Value val DATA_SOURCE_PROVIDER = Value + val DEFAULT_COMPACT_INTERVAL = Value val DEFAULT_ISOLATION_LEVEL = Value val DEFAULT_VALUE = Value val DELAY = Value @@ -102,6 +111,7 @@ object LogKey extends Enumeration { val DELTA = Value val DESCRIPTION = Value val DESIRED_PARTITIONS_SIZE = Value + val DFS_FILE = Value val DIFF_DELTA = Value val DIVISIBLE_CLUSTER_INDICES_SIZE = Value val DRIVER_ID = Value @@ -112,12 +122,13 @@ object LogKey extends Enumeration { val ENCODING = Value val END_INDEX = Value val END_POINT = Value + val END_VERSION = Value val ENGINE = Value + val EPOCH = Value val ERROR = Value val ESTIMATOR_PARAMETER_MAP = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value - val EXCEPTION = Value val EXECUTE_INFO = Value val EXECUTE_KEY = Value val EXECUTION_PLAN_LEAVES = Value @@ -131,6 +142,7 @@ object LogKey extends Enumeration { val EXECUTOR_RESOURCES = Value val EXECUTOR_STATE = Value val EXECUTOR_TARGET_COUNT = Value + val EXISTING_FILE = Value val EXIT_CODE = Value val EXPECTED_NUM_FILES = Value val EXPECTED_PARTITION_COLUMN = Value @@ -144,15 +156,20 @@ object LogKey extends Enumeration { val FEATURE_DIMENSION = Value val FIELD_NAME = Value val FILE_ABSOLUTE_PATH = Value + val FILE_END_OFFSET = Value val FILE_FORMAT = Value val FILE_FORMAT2 = Value + val FILE_MODIFICATION_TIME = Value val FILE_NAME = Value + val FILE_START_OFFSET = Value val FILE_VERSION = Value + val FINAL_PATH = Value val FINISH_TRIGGER_DURATION = Value val FROM_OFFSET = Value val FROM_TIME = Value val FUNCTION_NAME = Value val FUNCTION_PARAMETER = Value + val GLOBAL_WATERMARK = Value val GROUP_ID = Value val HADOOP_VERSION = Value val HASH_JOIN_KEYS = Value @@ -194,10 +211,13 @@ object LogKey extends Enumeration { val LINE = Value val LINE_NUM = Value val LISTENER = Value + val LOADED_VERSION = Value val LOAD_FACTOR = Value val LOAD_TIME = Value val LOGICAL_PLAN_COLUMNS = Value val LOGICAL_PLAN_LEAVES = Value + val LOG_ID = Value + val LOG_OFFSET = Value val LOG_TYPE = Value val LOWER_BOUND = Value val MALFORMATTED_STIRNG = Value @@ -216,10 +236,15 @@ object LogKey extends Enumeration { val MEMORY_SIZE = Value val MERGE_DIR_NAME = Value val MESSAGE = Value + val METADATA_DIRECTORY = Value + val METADATA_JSON = Value val METHOD_NAME = Value + val METRICS_JSON = Value + val MIN_COMPACTION_BATCH_ID = Value val MIN_FREQUENT_PATTERN_COUNT = Value val MIN_POINT_PER_CLUSTER = Value val MIN_SIZE = Value + val MIN_VERSION_NUMBER = Value val MODEL_WEIGHTS = Value val NAMESPACE = Value val NEW_FEATURE_COLUMN_NAME = Value @@ -230,11 +255,15 @@ object LogKey extends Enumeration { val NODE_LOCATION = Value val NORM = Value val NUM_BIN = Value + val NUM_BYTES = Value val NUM_CLASSES = Value val NUM_COLUMNS = Value val NUM_EXAMPLES = Value val NUM_FEATURES = Value val NUM_FILES = Value + val NUM_FILES_COPIED = Value + val NUM_FILES_FAILED_TO_DELETE = Value + val NUM_FILES_REUSED = Value val NUM_FREQUENT_ITEMS = Value val NUM_ITERATIONS = Value val NUM_LOCAL_FREQUENT_PATTERN = Value @@ -245,6 +274,7 @@ object LogKey extends Enumeration { val OBJECT_ID = Value val OFFSET = Value val OFFSETS = Value + val OFFSET_SEQUENCE_METADATA = Value val OLD_BLOCK_MANAGER_ID = Value val OLD_VALUE = Value val OPTIMIZED_PLAN_COLUMNS = Value @@ -272,6 +302,7 @@ object LogKey extends Enumeration { val POD_TARGET_COUNT = Value val POLICY = Value val PORT = Value + val PRETTY_ID_STRING = Value val PRINCIPAL = Value val PROCESSING_TIME = Value val PRODUCER_ID = Value @@ -309,6 +340,8 @@ object LogKey extends Enumeration { val RETRY_INTERVAL = Value val RIGHT_EXPR = Value val RMSE = Value + val ROCKS_DB_LOG_LEVEL = Value + val ROCKS_DB_LOG_MESSAGE = Value val RPC_ENDPOINT_REF = Value val RULE_BATCH_NAME = Value val RULE_NAME = Value @@ -329,6 +362,7 @@ object LogKey extends Enumeration { val SLEEP_TIME = Value val SLIDE_DURATION = Value val SMALLEST_CLUSTER_INDEX = Value + val SNAPSHOT_VERSION = Value val SPARK_DATA_STREAM = Value val SPARK_PLAN_ID = Value val SQL_TEXT = Value @@ -337,13 +371,23 @@ object LogKey extends Enumeration { val STAGE_ID = Value val START_INDEX = Value val STATEMENT_ID = Value + val STATE_STORE_ID = Value val STATE_STORE_PROVIDER = Value + val STATE_STORE_VERSION = Value val STATUS = Value val STDERR = Value val STORAGE_LEVEL = Value val STORAGE_LEVEL_DESERIALIZED = Value val STORAGE_LEVEL_REPLICATION = Value val STORE_ID = Value + val STREAMING_DATA_SOURCE_DESCRIPTION = Value + val STREAMING_DATA_SOURCE_NAME = Value + val STREAMING_OFFSETS_END = Value + val STREAMING_OFFSETS_START = Value + val STREAMING_QUERY_PROGRESS = Value + val STREAMING_SOURCE = Value + val STREAMING_TABLE = Value + val STREAMING_WRITE = Value val STREAM_ID = Value val STREAM_NAME = Value val SUBMISSION_ID = Value @@ -398,6 +442,7 @@ object LogKey extends Enumeration { val USER_ID = Value val USER_NAME = Value val VALUE = Value + val VERSION_NUMBER = Value val VIRTUAL_CORES = Value val VOCAB_SIZE = Value val WAIT_RESULT_TIME = Value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 12ca0652f1b7..e5bfb01d51f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -745,7 +745,7 @@ case class AdaptiveSparkPlanExec( Some((finalPlan, optimized)) } catch { case e: InvalidAQEPlanException[_] => - logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n" + + logOnLevel(log"Re-optimize - ${MDC(ERROR, e.getMessage())}:\n" + log"${MDC(QUERY_PLAN, e.plan)}") None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala index ec24ec0fd335..75c7f9dbe861 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent._ import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.internal.LogKey.PRETTY_ID_STRING +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.streaming.WriteToStream import org.apache.spark.sql.errors.QueryExecutionErrors @@ -221,7 +223,8 @@ class AsyncProgressTrackingMicroBatchExecution( super.cleanup() ThreadUtils.shutdown(asyncWritesExecutorService) - logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown") + logInfo(log"Async progress tracking executor pool for query " + + log"${MDC(PRETTY_ID_STRING, prettyIdString)} has been shutdown") } // used for testing diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 1c530fcabe0a..e9307ec971ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{PATH, TEMP_PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods import org.apache.spark.sql.internal.SQLConf @@ -144,7 +144,8 @@ object CheckpointFileManager extends Logging { this(fm, path, generateTempPath(path), overwrite) } - logInfo(s"Writing atomically to $finalPath using temp file $tempPath") + logInfo(log"Writing atomically to ${MDC(FINAL_PATH, finalPath)} using temp file " + + log"${MDC(TEMP_PATH, tempPath)}") @volatile private var terminated = false override def close(): Unit = synchronized { @@ -166,7 +167,8 @@ object CheckpointFileManager extends Logging { s"But $finalPath does not exist.") } - logInfo(s"Renamed temp file $tempPath to $finalPath") + logInfo(log"Renamed temp file ${MDC(TEMP_PATH, tempPath)} to " + + log"${MDC(FINAL_PATH, finalPath)}") } finally { terminated = true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 884de070bf32..908a9c662669 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization -import org.apache.spark.internal.LogKey.{BATCH_ID, ELAPSED_TIME} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors @@ -105,8 +105,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( defaultCompactInterval, compactibleBatchIds(0).toInt) } assert(interval > 0, s"intervalValue = $interval not positive value.") - logInfo(s"Set the compact interval to $interval " + - s"[defaultCompactInterval: $defaultCompactInterval]") + logInfo(log"Set the compact interval to ${MDC(COMPACT_INTERVAL, interval)} " + + log"[defaultCompactInterval: ${MDC(DEFAULT_COMPACT_INTERVAL, defaultCompactInterval)}]") interval } @@ -309,8 +309,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( assert(isCompactionBatch(minCompactionBatchId, compactInterval), s"$minCompactionBatchId is not a compaction batch") - logInfo(s"Current compact batch id = $currentBatchId " + - s"min compaction batch id to delete = $minCompactionBatchId") + logInfo(log"Current compact batch id = ${MDC(CURRENT_BATCH_ID, currentBatchId)} " + + log"min compaction batch id to delete = " + + log"${MDC(MIN_COMPACTION_BATCH_ID, minCompactionBatchId)}") val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs fileManager.list(metadataPath, (path: Path) => { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 18895e525156..806caebeaf4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ @@ -146,7 +146,7 @@ class FileStreamSink( override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) { - logInfo(s"Skipping already committed batch $batchId") + logInfo(log"Skipping already committed batch ${MDC(BATCH_ID, batchId)}") } else { val committer = FileCommitProtocol.instantiate( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index d8aa31be4797..5059bdecca66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming import org.apache.hadoop.fs.FileStatus +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf @@ -101,7 +103,7 @@ class FileStreamSinkLog( val retentionMs: Long = _retentionMs match { case Some(retention) => - logInfo(s"Retention is set to $retention ms") + logInfo(log"Retention is set to ${MDC(TIME_UNITS, retention)} ms") retention case _ => Long.MaxValue diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 6aa8e94db94f..b8151a07398a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{CURRENT_PATH, ELAPSED_TIME, NEW_PATH, NUM_FILES} +import org.apache.spark.internal.LogKey._ import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -126,8 +126,9 @@ class FileStreamSource( } seenFiles.purge() - logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " + - s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs") + logInfo(log"maxFilesPerBatch = ${MDC(NUM_FILES, maxFilesPerBatch)}, " + + log"maxBytesPerBatch = ${MDC(NUM_BYTES, maxBytesPerBatch)}, " + + log"maxFileAgeMs = ${MDC(TIME_UNITS, maxFileAgeMs)}") private var unreadFiles: Seq[NewFileEntry] = _ @@ -251,7 +252,8 @@ class FileStreamSource( FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = metadataLogCurrentOffset) }.toArray if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) { - logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files") + logInfo(log"Log offset set to ${MDC(LOG_OFFSET, metadataLogCurrentOffset)} " + + log"with ${MDC(NUM_FILES, batchFiles.size)} new files") } else { throw new IllegalStateException("Concurrent update to the log. Multiple streaming jobs " + s"detected for $metadataLogCurrentOffset") @@ -291,7 +293,8 @@ class FileStreamSource( assert(startOffset <= endOffset) val files = metadataLog.get(Some(startOffset + 1), Some(endOffset)).flatMap(_._2) - logInfo(s"Processing ${files.length} files from ${startOffset + 1}:$endOffset") + logInfo(log"Processing ${MDC(NUM_FILES, files.length)} " + + log"files from ${MDC(FILE_START_OFFSET, startOffset + 1)}:${MDC(FILE_END_OFFSET, endOffset)}") logTrace(s"Files are:\n\t" + files.mkString("\n\t")) val newDataSource = DataSource( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index b3eedbf93f04..febe4dc8efc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -29,7 +29,8 @@ import org.apache.hadoop.fs._ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf @@ -265,7 +266,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: override def getLatest(): Option[(Long, T)] = { listBatches.sorted.lastOption.map { batchId => - logInfo(s"Getting latest batch $batchId") + logInfo(log"Getting latest batch ${MDC(BATCH_ID, batchId)}") (batchId, getExistingBatch(batchId)) } } @@ -335,7 +336,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: batchCache.synchronized { batchCache.keySet.asScala.toArray } - logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", ")) + logInfo(log"BatchIds found from listing: ${MDC(BATCH_ID, batchIds.sorted.mkString(", "))}") if (batchIds.isEmpty) { Array.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 62999d711ce8..c3cb7949bbd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.fs.Path import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.ERROR +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, ExpressionWithRandomSeed} @@ -102,7 +102,7 @@ class IncrementalExecution( tracker).transformAllExpressionsWithPruning( _.containsAnyPattern(CURRENT_LIKE, EXPRESSION_WITH_RANDOM_SEED)) { case ts @ CurrentBatchTimestamp(timestamp, _, _) => - logInfo(s"Current batch timestamp = $timestamp") + logInfo(log"Current batch timestamp = ${MDC(BATCH_TIMESTAMP, timestamp)}") ts.toLiteral case e: ExpressionWithRandomSeed => e.withNewSeed(Utils.random.nextLong()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index f444c46f1419..230b8f5dc4ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.PATH +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors @@ -75,7 +75,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String) pendingCommitFiles.clear() if (fileLog.add(batchId, fileStatuses)) { - logInfo(s"Committed batch $batchId") + logInfo(log"Committed batch ${MDC(BATCH_ID, batchId)}") } else { throw new IllegalStateException(s"Race while writing batch $batchId") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index 6eaccfb6d934..bd1aa91e43ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -21,6 +21,8 @@ import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType @@ -47,7 +49,7 @@ class MetadataLogFileIndex( metadataDir } - logInfo(s"Reading streaming file log from $metadataDirectory") + logInfo(log"Reading streaming file log from ${MDC(METADATA_DIRECTORY, metadataDirectory)}") private val metadataLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toString) private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index e5d275d7c242..ec7a259609d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable -import org.apache.spark.internal.LogKey.{LATEST_BATCH_ID, LATEST_COMMITTED_BATCH_ID, READ_LIMIT, SPARK_DATA_STREAM} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.MDC import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -103,8 +103,8 @@ class MicroBatchExecution( // See SPARK-45178 for more details. if (sparkSession.sessionState.conf.getConf( SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) { - logInfo("Configured to use the wrapper of Trigger.AvailableNow for query " + - s"$prettyIdString.") + logInfo(log"Configured to use the wrapper of Trigger.AvailableNow for query " + + log"${MDC(PRETTY_ID_STRING, prettyIdString)}.") MultiBatchExecutor() } else { val supportsTriggerAvailableNow = sources.distinct.forall { src => @@ -158,7 +158,9 @@ class MicroBatchExecution( val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" val source = dataSourceV1.createSource(metadataPath) nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]") + logInfo(log"Using Source [${MDC(STREAMING_SOURCE, source)}] " + + log"from DataSourceV1 named '${MDC(STREAMING_DATA_SOURCE_NAME, sourceName)}' " + + log"[${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dataSourceV1)}]") StreamingExecutionRelation(source, output, dataSourceV1.catalogTable)(sparkSession) }) @@ -171,7 +173,9 @@ class MicroBatchExecution( // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 - logInfo(s"Reading table [$table] from DataSourceV2 named '$srcName' $dsStr") + logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " + + log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME, srcName)}' " + + log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") // TODO: operator pushdown. val scan = table.newScanBuilder(options).build() val stream = scan.toMicroBatchStream(metadataPath) @@ -189,7 +193,9 @@ class MicroBatchExecution( val source = v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath) nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' $dsStr") + logInfo(log"Using Source [${MDC(STREAMING_SOURCE, source)}] from DataSourceV2 " + + log"named '${MDC(STREAMING_DATA_SOURCE_NAME, srcName)}' " + + log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") // We don't have a catalog table but may have a table identifier. Given this is about // v1 fallback path, we just give up and set the catalog table as None. StreamingExecutionRelation(source, output, None)(sparkSession) @@ -280,7 +286,7 @@ class MicroBatchExecution( // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak sparkSession.sparkContext.cancelJobGroup(runId.toString) } - logInfo(s"Query $prettyIdString was stopped") + logInfo(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} was stopped") } private val watermarkPropagator = WatermarkPropagator(sparkSession.sessionState.conf) @@ -290,7 +296,8 @@ class MicroBatchExecution( // shutdown and cleanup required for async log purge mechanism asyncLogPurgeShutdown() - logInfo(s"Async log purge executor pool for query ${prettyIdString} has been shutdown") + logInfo(log"Async log purge executor pool for query " + + log"${MDC(PRETTY_ID_STRING, prettyIdString)} has been shutdown") } private def initializeExecution( @@ -306,7 +313,7 @@ class MicroBatchExecution( setLatestExecutionContext(execCtx) populateStartOffsets(execCtx, sparkSessionForStream) - logInfo(s"Stream started from ${execCtx.startOffsets}") + logInfo(log"Stream started from ${MDC(STREAMING_OFFSETS_START, execCtx.startOffsets)}") execCtx } /** @@ -520,8 +527,9 @@ class MicroBatchExecution( } // initialize committed offsets to start offsets of the most recent committed batch committedOffsets = execCtx.startOffsets - logInfo(s"Resuming at batch ${execCtx.batchId} with committed offsets " + - s"${execCtx.startOffsets} and available offsets ${execCtx.endOffsets}") + logInfo(log"Resuming at batch ${MDC(BATCH_ID, execCtx.batchId)} with committed offsets " + + log"${MDC(STREAMING_OFFSETS_START, execCtx.startOffsets)} and available offsets " + + log"${MDC(STREAMING_OFFSETS_END, execCtx.endOffsets)}") case None => // We are starting this stream for the first time. logInfo(s"Starting new streaming query.") execCtx.batchId = 0 @@ -875,8 +883,8 @@ class MicroBatchExecution( throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } - logInfo(s"Committed offsets for batch ${execCtx.batchId}. " + - s"Metadata ${execCtx.offsetSeqMetadata.toString}") + logInfo(log"Committed offsets for batch ${MDC(BATCH_ID, execCtx.batchId)}. " + + log"Metadata ${MDC(OFFSET_SEQUENCE_METADATA, execCtx.offsetSeqMetadata.toString)}") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 138e4abefce2..2e633009dd67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{EXECUTION_PLAN_LEAVES, FINISH_TRIGGER_DURATION, LOGICAL_PLAN_LEAVES, PROCESSING_TIME} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.optimizer.InlineCTE import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan, WithCTE} @@ -82,7 +82,7 @@ class ProgressReporter( addNewProgress(newProgress) postEvent(new QueryProgressEvent(newProgress)) - logInfo(s"Streaming query made progress: $newProgress") + logInfo(log"Streaming query made progress: ${MDC(STREAMING_QUERY_PROGRESS, newProgress)}") } private def addNewProgress(newProgress: StreamingQueryProgress): Unit = { @@ -104,8 +104,8 @@ class ProgressReporter( addNewProgress(newProgress) if (lastNoExecutionProgressEventTime > Long.MinValue) { postEvent(new QueryIdleEvent(id, runId, formatTimestamp(currentTriggerStartTimestamp))) - logInfo(s"Streaming query has been idle and waiting for new data more than " + - s"${noDataProgressEventInterval} ms.") + logInfo(log"Streaming query has been idle and waiting for new data more than " + + log"${MDC(TIME_UNITS, noDataProgressEventInterval)} ms.") } lastNoExecutionProgressEventTime = now diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala index e80860b82c32..60052687b2d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala @@ -23,7 +23,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.apache.spark.internal.LogKey.{CONFIG, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker @@ -134,7 +134,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { val checkpointDir = fileManager.createCheckpointDirectory() checkpointDir.toString } - logInfo(s"Checkpoint root $checkpointLocation resolved to $resolvedCheckpointRoot.") + logInfo(log"Checkpoint root ${MDC(CHECKPOINT_LOCATION, checkpointLocation)} " + + log"resolved to ${MDC(CHECKPOINT_ROOT, resolvedCheckpointRoot)}.") (resolvedCheckpointRoot, deleteCheckpointOnStop) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ce2bbba550bc..b4316bad9a80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, SparkThrowable} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{PATH, SPARK_DATA_STREAM} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ @@ -261,7 +261,8 @@ abstract class StreamExecution( * has been posted to all the listeners. */ def start(): Unit = { - logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") + logInfo(log"Starting ${MDC(PRETTY_ID_STRING, prettyIdString)}. " + + log"Use ${MDC(CHECKPOINT_ROOT, resolvedCheckpointRoot)} to store the query checkpoint.") queryExecutionThread.setDaemon(true) queryExecutionThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted @@ -406,7 +407,7 @@ abstract class StreamExecution( .getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || exception.isEmpty)) { val checkpointPath = new Path(resolvedCheckpointRoot) try { - logInfo(s"Deleting checkpoint $checkpointPath.") + logInfo(log"Deleting checkpoint ${MDC(CHECKPOINT_PATH, checkpointPath)}.") fileManager.delete(checkpointPath) } catch { case NonFatal(e) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala index b0f8cf9cd184..83da9af739da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala @@ -21,7 +21,8 @@ import java.util.Locale import scala.collection.mutable -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf @@ -113,7 +114,9 @@ case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging { // `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` implementations. val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq) if (chosenGlobalWatermark > globalWatermarkMs) { - logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms") + logInfo(log"Updating event-time watermark from " + + log"${MDC(GLOBAL_WATERMARK, globalWatermarkMs)} " + + log"to ${MDC(CHOSEN_WATERMARK, chosenGlobalWatermark)} ms") globalWatermarkMs = chosenGlobalWatermark } else { logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < $globalWatermarkMs") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 920a7c68314b..0756449e1e53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -25,6 +25,8 @@ import java.util.function.UnaryOperator import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.SparkEnv +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -83,7 +85,9 @@ class ContinuousExecution( v2ToRelationMap.getOrElseUpdate(s, { val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 - logInfo(s"Reading table [$table] from DataSourceV2 named '$sourceName' $dsStr") + logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " + + log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME, sourceName)}' " + + log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") // TODO: operator pushdown. val scan = table.newScanBuilder(options).build() val stream = scan.toContinuousStream(metadataPath) @@ -276,7 +280,7 @@ class ContinuousExecution( false } else if (isActive) { execCtx.batchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) - logInfo(s"New epoch ${execCtx.batchId} is starting.") + logInfo(log"New epoch ${MDC(BATCH_ID, execCtx.batchId)} is starting.") true } else { false @@ -307,7 +311,8 @@ class ContinuousExecution( } catch { case t: Throwable if StreamExecution.isInterruptionException(t, sparkSession.sparkContext) && state.get() == RECONFIGURING => - logInfo(s"Query $id ignoring exception from reconfiguring: $t") + logInfo(log"Query ${MDC(QUERY_ID, id)} ignoring exception from reconfiguring: " + + log"${MDC(ERROR, t)}") // interrupted by reconfiguration - swallow exception so we can restart the query } finally { // The above execution may finish before getting interrupted, for example, a Spark job having @@ -476,7 +481,7 @@ class ContinuousExecution( // We just need to interrupt the long running job. interruptAndAwaitExecutionThreadTermination() } - logInfo(s"Query $prettyIdString was stopped") + logInfo(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} was stopped") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala index 8e5548ca2aca..2a508f27a10e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala @@ -23,7 +23,8 @@ import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} import scala.util.control.NonFatal import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.connector.read.PartitionReader @@ -159,7 +160,8 @@ class ContinuousQueuedDataReader( } catch { case _: InterruptedException => // Continuous shutdown always involves an interrupt; do nothing and shut down quietly. - logInfo(s"shutting down interrupted data reader thread $getName") + logInfo(log"shutting down interrupted data reader thread " + + log"${MDC(THREAD_NAME, getName)}") case NonFatal(t) => failureReason = t diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala index 1d6ba87145d4..3af7b2a76f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.write.DataWriter @@ -68,8 +70,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writerFactory: StreamingDat } CustomMetrics.updateMetrics( dataWriter.currentMetricsValues.toImmutableArraySeq, customMetrics) - logInfo(s"Writer for partition ${context.partitionId()} " + - s"in epoch ${EpochTracker.getCurrentEpoch.get} is committing.") + logInfo(log"Writer for partition ${MDC(PARTITION_ID, context.partitionId())} " + + log"in epoch ${MDC(EPOCH, EpochTracker.getCurrentEpoch.get)} is committing.") val msg = dataWriter.commit() epochCoordinator.send( CommitPartitionEpoch( @@ -77,8 +79,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writerFactory: StreamingDat EpochTracker.getCurrentEpoch.get, msg) ) - logInfo(s"Writer for partition ${context.partitionId()} " + - s"in epoch ${EpochTracker.getCurrentEpoch.get} committed.") + logInfo(log"Writer for partition ${MDC(PARTITION_ID, context.partitionId())} " + + log"in epoch ${MDC(EPOCH, EpochTracker.getCurrentEpoch.get)} committed.") EpochTracker.incrementCurrentEpoch() } catch { case _: InterruptedException => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index 2b7d68f9b98b..d2b6ef0eb796 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming.continuous -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -47,8 +48,8 @@ case class WriteToContinuousDataSourceExec(write: StreamingWrite, query: SparkPl PhysicalWriteInfoImpl(queryRdd.getNumPartitions)) val rdd = new ContinuousWriteRDD(queryRdd, writerFactory, metrics) - logInfo(s"Start processing data source write support: $write. " + - s"The input RDD has ${rdd.partitions.length} partitions.") + logInfo(log"Start processing data source write support: ${MDC(STREAMING_WRITE, write)}. " + + log"The input RDD has ${MDC(NUM_PARTITION, rdd.partitions.length)} partitions.") EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala index 5640c7d3ca76..520e7fc23f4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala @@ -23,7 +23,8 @@ import java.util.concurrent.TimeUnit import org.apache.commons.io.IOUtils -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -92,7 +93,7 @@ class RateStreamMicroBatchStream( metadataLog.get(0).getOrElse { val offset = LongOffset(clock.getTimeMillis()) metadataLog.add(0, offset) - logInfo(s"Start time: $offset") + logInfo(log"Start time: ${MDC(TIME_UNITS, offset)}") offset }.offset } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index eaa4e92af929..3350cd900d4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.fs._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.{Logging, MDC, MessageWithContext} -import org.apache.spark.internal.LogKey.{FILE_VERSION, OP_ID, PARTITION_ID, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors @@ -170,7 +170,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with verify(state == UPDATING, "Cannot commit after already committed or aborted") commitUpdates(newVersion, mapToUpdate, compressedStream) state = COMMITTED - logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") + logInfo(log"Committed version ${MDC(COMMITTED_VERSION, newVersion)} " + + log"for ${MDC(STATE_STORE_PROVIDER, this)} to file ${MDC(FILE_NAME, finalDeltaFile)}") newVersion } catch { case e: Throwable => @@ -188,7 +189,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } else { state = ABORTED } - logInfo(s"Aborted version $newVersion for $this") + logInfo(log"Aborted version ${MDC(STATE_STORE_VERSION, newVersion)} " + + log"for ${MDC(STATE_STORE_PROVIDER, this)}") } /** @@ -254,14 +256,16 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with /** Get the state store for making updates to create a new `version` of the store. */ override def getStore(version: Long): StateStore = { val newMap = getLoadedMapForStore(version) - logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update") + logInfo(log"Retrieved version ${MDC(STATE_STORE_VERSION, version)} " + + log"of ${MDC(STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update") new HDFSBackedStateStore(version, newMap) } /** Get the state store for reading to specific `version` of the store. */ override def getReadStore(version: Long): ReadStateStore = { val newMap = getLoadedMapForStore(version) - logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for readonly") + logInfo(log"Retrieved version ${MDC(STATE_STORE_VERSION, version)} of " + + log"${MDC(STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for readonly") new HDFSBackedReadStateStore(version, newMap) } @@ -590,7 +594,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } finally { if (input != null) input.close() } - logInfo(s"Read delta file for version $version of $this from $fileToRead") + logInfo(log"Read delta file for version ${MDC(FILE_VERSION, version)} " + + log"of ${MDC(STATE_STORE_PROVIDER, this)} from ${MDC(FILE_NAME, fileToRead)}") } private def writeSnapshotFile(version: Long, map: HDFSBackedStateStoreMap): Unit = { @@ -617,7 +622,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with cancelDeltaFile(compressedStream = output, rawStream = rawOutput) throw e } - logInfo(s"Written snapshot file for version $version of $this at $targetFile") + logInfo(log"Written snapshot file for version ${MDC(FILE_VERSION, version)} of " + + log"${MDC(STATE_STORE_PROVIDER, this)} at ${MDC(FILE_NAME, targetFile)}") } /** @@ -642,8 +648,9 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with // SPARK-42668 - Catch and log any other exception thrown while trying to cancel // raw stream or close compressed stream. case NonFatal(ex) => - logInfo(s"Failed to cancel delta file for provider=$stateStoreId " + - s"with exception=$ex") + logInfo(log"Failed to cancel delta file for " + + log"provider=${MDC(STATE_STORE_ID, stateStoreId)} " + + log"with exception=${MDC(ERROR, ex)}") } } @@ -692,7 +699,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } } } - logInfo(s"Read snapshot file for version $version of $this from $fileToRead") + logInfo(log"Read snapshot file for version ${MDC(SNAPSHOT_VERSION, version)} " + + log"of ${MDC(STATE_STORE_PROVIDER, this)} from ${MDC(FILE_NAME, fileToRead)}") Some(map) } catch { case _: FileNotFoundException => @@ -750,8 +758,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } } logDebug(s"deleting files took $e2 ms.") - logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " + - filesToDelete.mkString(", ")) + logInfo(log"Deleted files older than " + + log"${MDC(FILE_VERSION, earliestFileToRetain.version)} for " + + log"${MDC(STATE_STORE_PROVIDER, this)}: " + + log"${MDC(FILE_NAME, filesToDelete.mkString(", "))}") } } } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index b1005e589073..eb74bd40a285 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -36,8 +36,8 @@ import org.rocksdb.CompressionType._ import org.rocksdb.TickerType._ import org.apache.spark.TaskContext -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, PATH} +import org.apache.spark.internal.{LogEntry, Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.{NextIterator, Utils} @@ -178,7 +178,7 @@ class RocksDB( assert(version >= 0) acquire(LoadStore) recordedMetrics = None - logInfo(s"Loading $version") + logInfo(log"Loading ${MDC(VERSION_NUMBER, version)}") try { if (loadedVersion != version) { closeDB() @@ -213,7 +213,7 @@ class RocksDB( if (conf.resetStatsOnLoad) { nativeStats.reset } - logInfo(s"Loaded $version") + logInfo(log"Loaded ${MDC(VERSION_NUMBER, version)}") } catch { case t: Throwable => loadedVersion = -1 // invalidate loaded data @@ -232,7 +232,8 @@ class RocksDB( */ private def replayChangelog(endVersion: Long): Unit = { for (v <- loadedVersion + 1 to endVersion) { - logInfo(s"replaying changelog from version $loadedVersion -> $endVersion") + logInfo(log"replaying changelog from version ${MDC(LOADED_VERSION, loadedVersion)} -> " + + log"${MDC(END_VERSION, endVersion)}") var changelogReader: StateStoreChangelogReader = null try { changelogReader = fileManager.getChangelogReader(v, useColumnFamilies) @@ -462,7 +463,7 @@ class RocksDB( verifyColFamilyOperations("iterator", colFamilyName) val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName)) - logInfo(s"Getting iterator from version $loadedVersion") + logInfo(log"Getting iterator from version ${MDC(LOADED_VERSION, loadedVersion)}") iter.seekToFirst() // Attempt to close this iterator if there is a task failure, or a task interruption. @@ -492,7 +493,8 @@ class RocksDB( val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName)) try { - logInfo(s"Counting keys - getting iterator from version $loadedVersion") + logInfo(log"Counting keys - getting iterator from version " + + log"${MDC(LOADED_VERSION, loadedVersion)}") iter.seekToFirst() @@ -546,7 +548,7 @@ class RocksDB( val newVersion = loadedVersion + 1 try { - logInfo(s"Flushing updates for $newVersion") + logInfo(log"Flushing updates for ${MDC(VERSION_NUMBER, newVersion)}") var compactTimeMs = 0L var flushTimeMs = 0L @@ -554,7 +556,7 @@ class RocksDB( if (shouldCreateSnapshot()) { // Need to flush the change to disk before creating a checkpoint // because rocksdb wal is disabled. - logInfo(s"Flushing updates for $newVersion") + logInfo(log"Flushing updates for ${MDC(VERSION_NUMBER, newVersion)}") flushTimeMs = timeTakenMs { // Flush updates to all available column families assert(!colFamilyNameToHandleMap.isEmpty) @@ -572,7 +574,8 @@ class RocksDB( checkpointTimeMs = timeTakenMs { val checkpointDir = createTempDir("checkpoint") - logInfo(s"Creating checkpoint for $newVersion in $checkpointDir") + logInfo(log"Creating checkpoint for ${MDC(VERSION_NUMBER, newVersion)} " + + log"in ${MDC(PATH, checkpointDir)}") // Make sure the directory does not exist. Native RocksDB fails if the directory to // checkpoint exists. Utils.deleteRecursively(checkpointDir) @@ -592,7 +595,7 @@ class RocksDB( } } - logInfo(s"Syncing checkpoint for $newVersion to DFS") + logInfo(log"Syncing checkpoint for ${MDC(VERSION_NUMBER, newVersion)} to DFS") val fileSyncTimeMs = timeTakenMs { if (enableChangelogCheckpointing) { try { @@ -616,7 +619,8 @@ class RocksDB( "fileSync" -> fileSyncTimeMs ) recordedMetrics = Some(metrics) - logInfo(s"Committed $newVersion, stats = ${recordedMetrics.get.json}") + logInfo(log"Committed ${MDC(VERSION_NUMBER, newVersion)}, " + + log"stats = ${MDC(METRICS_JSON, recordedMetrics.get.json)}") loadedVersion } catch { case t: Throwable => @@ -651,8 +655,9 @@ class RocksDB( fileManager.saveCheckpointToDfs(localDir, version, numKeys) fileManagerMetrics = fileManager.latestSaveCheckpointMetrics } - logInfo(s"$loggingId: Upload snapshot of version $version," + - s" time taken: $uploadTime ms") + logInfo(log"${MDC(LOG_ID, loggingId)}: Upload snapshot of version " + + log"${MDC(VERSION_NUMBER, version)}," + + log" time taken: ${MDC(TIME_UNITS, uploadTime)} ms") } finally { localCheckpoint.foreach(_.close()) } @@ -671,7 +676,7 @@ class RocksDB( // Make sure changelogWriter gets recreated next time. changelogWriter = None release(RollbackStore) - logInfo(s"Rolled back to $loadedVersion") + logInfo(log"Rolled back to ${MDC(VERSION_NUMBER, loadedVersion)}") } def doMaintenance(): Unit = { @@ -694,7 +699,7 @@ class RocksDB( val cleanupTime = timeTakenMs { fileManager.deleteOldVersions(conf.minVersionsToRetain) } - logInfo(s"Cleaned old data, time taken: $cleanupTime ms") + logInfo(log"Cleaned old data, time taken: ${MDC(TIME_UNITS, cleanupTime)} ms") } /** Release all resources */ @@ -801,7 +806,7 @@ class RocksDB( rocksDBMetricsOpt = recordedMetrics } catch { case ex: Exception => - logInfo(s"Failed to acquire metrics with exception=$ex") + logInfo(log"Failed to acquire metrics with exception=${MDC(ERROR, ex)}") } finally { release(ReportStoreMetrics) } @@ -839,7 +844,8 @@ class RocksDB( Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] { _ => this.release(StoreTaskCompletionListener) }) - logInfo(s"RocksDB instance was acquired by $acquiredThreadInfo for opType=${opType.toString}") + logInfo(log"RocksDB instance was acquired by ${MDC(THREAD, acquiredThreadInfo)} " + + log"for opType=${MDC(OP_TYPE, opType.toString)}") } } @@ -850,7 +856,8 @@ class RocksDB( * @param opType - operation type releasing the lock */ private def release(opType: RocksDBOpType): Unit = acquireLock.synchronized { - logInfo(s"RocksDB instance was released by $acquiredThreadInfo for opType=${opType.toString}") + logInfo(log"RocksDB instance was released by ${MDC(THREAD, acquiredThreadInfo)} " + + log"for opType=${MDC(OP_TYPE, opType.toString)}") acquiredThreadInfo = null acquireLock.notifyAll() } @@ -888,7 +895,7 @@ class RocksDB( colFamilyHandles.asScala.toList.foreach { handle => colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle } - logInfo(s"Opened DB with conf ${conf}") + logInfo(log"Opened DB with conf ${MDC(CONFIG, conf)}") } private def closeDB(): Unit = { @@ -912,13 +919,14 @@ class RocksDB( // Map DB log level to log4j levels // Warn is mapped to info because RocksDB warn is too verbose // (e.g. dumps non-warning stuff like stats) - val loggingFunc: ( => String) => Unit = infoLogLevel match { + val loggingFunc: ( => LogEntry) => Unit = infoLogLevel match { case InfoLogLevel.FATAL_LEVEL | InfoLogLevel.ERROR_LEVEL => logError(_) case InfoLogLevel.WARN_LEVEL | InfoLogLevel.INFO_LEVEL => logInfo(_) case InfoLogLevel.DEBUG_LEVEL => logDebug(_) case _ => logTrace(_) } - loggingFunc(s"[NativeRocksDB-${infoLogLevel.getValue}] $logMsg") + loggingFunc(log"[NativeRocksDB-${MDC(ROCKS_DB_LOG_LEVEL, infoLogLevel.getValue)}] " + + log"${MDC(ROCKS_DB_LOG_MESSAGE, logMsg)}") } } @@ -931,7 +939,7 @@ class RocksDB( // customized logger. We still set it as it might show up in RocksDB config file or logging. dbOptions.setInfoLogLevel(dbLogLevel) dbOptions.setLogger(dbLogger) - logInfo(s"Set RocksDB native logging level to $dbLogLevel") + logInfo(log"Set RocksDB native logging level to ${MDC(ROCKS_DB_LOG_LEVEL, dbLogLevel)}") dbLogger } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index e24a156357f9..f1510e126795 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -38,8 +38,8 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{FILE_VERSION, MAX_FILE_VERSION, PATH} +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager @@ -206,13 +206,15 @@ class RocksDBFileManager( /** Save all the files in given local checkpoint directory as a committed version in DFS */ def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { - logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") + logFilesInDir(checkpointDir, log"Saving checkpoint files " + + log"for version ${MDC(VERSION_NUMBER, version)}") val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles) val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) val metadataFile = localMetadataFile(checkpointDir) metadata.writeToFile(metadataFile) - logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}") + logInfo(log"Written metadata for version ${MDC(VERSION_NUMBER, version)}:\n" + + log"${MDC(METADATA_JSON, metadata.prettyJson)}") if (version <= 1 && numKeys <= 0) { // If we're writing the initial version and there's no data, we have to explicitly initialize @@ -228,7 +230,7 @@ class RocksDBFileManager( } } zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version)) - logInfo(s"Saved checkpoint file for version $version") + logInfo(log"Saved checkpoint file for version ${MDC(VERSION_NUMBER, version)}") } /** @@ -238,7 +240,7 @@ class RocksDBFileManager( * local directory. */ def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { - logInfo(s"Loading checkpoint files for version $version") + logInfo(log"Loading checkpoint files for version ${MDC(VERSION_NUMBER, version)}") // The unique ids of SST files are checked when opening a rocksdb instance. The SST files // in larger versions can't be reused even if they have the same size and name because // they belong to another rocksdb instance. @@ -256,13 +258,15 @@ class RocksDBFileManager( // Copy the necessary immutable files val metadataFile = localMetadataFile(localDir) val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) - logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}") + logInfo(log"Read metadata for version ${MDC(VERSION_NUMBER, version)}:\n" + + log"${MDC(METADATA_JSON, metadata.prettyJson)}") loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) versionToRocksDBFiles.put(version, metadata.immutableFiles) metadataFile.delete() metadata } - logFilesInDir(localDir, s"Loaded checkpoint files for version $version") + logFilesInDir(localDir, log"Loaded checkpoint files " + + log"for version ${MDC(VERSION_NUMBER, version)}") metadata } @@ -328,8 +332,9 @@ class RocksDBFileManager( val orphanFiles = fileModificationTimes .filter(_._2 < oldestTrackedFileModificationTime).keys.toSeq if (orphanFiles.nonEmpty) { - logInfo(s"Found ${orphanFiles.size} orphan files: ${orphanFiles.take(20).mkString(", ")}" + - "... (display at most 20 filenames) that should be deleted.") + logInfo(log"Found ${MDC(NUM_FILES, orphanFiles.size)} " + + log"orphan files: ${MDC(FILE_MODIFICATION_TIME, orphanFiles.take(20).mkString(", "))}" + + log"... (display at most 20 filenames) that should be deleted.") } orphanFiles } else { @@ -341,7 +346,7 @@ class RocksDBFileManager( versionsToDelete.foreach { version => try { fm.delete(dfsChangelogFile(version)) - logInfo(s"Deleted changelog file $version") + logInfo(log"Deleted changelog file ${MDC(VERSION_NUMBER, version)}") } catch { case e: Exception => logWarning( @@ -433,7 +438,8 @@ class RocksDBFileManager( val allLogFiles = if (fm.exists(logDir)) fm.list(logDir).toImmutableArraySeq else Seq.empty filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles) .map(_ -> -1L) - logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") + logInfo(log"Deleting ${MDC(NUM_FILES, filesToDelete.size)} " + + log"files not used in versions >= ${MDC(VERSION_NUMBER, minVersionToRetain)}") var failedToDelete = 0 filesToDelete.foreach { case (dfsFileName, maxUsedVersion) => try { @@ -469,9 +475,10 @@ class RocksDBFileManager( log"version ${MDC(FILE_VERSION, version)}", e) } } - logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" + - s"$failedToDelete files) not used in versions >= $minVersionToRetain") - + logInfo(log"Deleted ${MDC(NUM_FILES, filesToDelete.size - failedToDelete)} files " + + log"(failed to delete" + + log"${MDC(NUM_FILES_FAILED_TO_DELETE, failedToDelete)} files) " + + log"not used in versions >= ${MDC(MIN_VERSION_NUMBER, minVersionToRetain)}") val changelogVersionsToDelete = changelogFiles .map(_.getName.stripSuffix(".changelog")).map(_.toLong) .filter(_ < minVersionToRetain) @@ -484,7 +491,7 @@ class RocksDBFileManager( localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version - logInfo(s"Saving RocksDB files to DFS for $version") + logInfo(log"Saving RocksDB files to DFS for ${MDC(VERSION_NUMBER, version)}") var bytesCopied = 0L var filesCopied = 0L @@ -495,7 +502,7 @@ class RocksDBFileManager( if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) { val dfsFile = existingDfsFile.get filesReused += 1 - logInfo(s"reusing file $dfsFile for $localFile") + logInfo(log"reusing file ${MDC(DFS_FILE, dfsFile)} for ${MDC(FILE_NAME, localFile)}") RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, dfsFile.sizeBytes) } else { val localFileName = localFile.getName @@ -508,7 +515,8 @@ class RocksDBFileManager( fs.copyFromLocalFile( new Path(localFile.getAbsoluteFile.toURI), dfsFile) val localFileSize = localFile.length() - logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + logInfo(log"Copied ${MDC(FILE_NAME, localFile)} to ${MDC(DFS_FILE, dfsFile)} - " + + log"${MDC(NUM_BYTES, localFileSize)} bytes") filesCopied += 1 bytesCopied += localFileSize @@ -518,8 +526,10 @@ class RocksDBFileManager( immutableDfsFile } } - logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + - s" DFS for version $version. $filesReused files reused without copying.") + logInfo(log"Copied ${MDC(NUM_FILES_COPIED, filesCopied)} files " + + log"(${MDC(NUM_BYTES, bytesCopied)} bytes) from local to" + + log" DFS for version ${MDC(VERSION_NUMBER, version)}. " + + log"${MDC(NUM_FILES_REUSED, filesReused)} files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) // Cleanup locally deleted files from the localFilesToDfsFiles map @@ -571,10 +581,13 @@ class RocksDBFileManager( if (!isSameFile) { existingFile.delete() localFilesToDfsFiles.remove(existingFile.getName) - logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" + - s" to previous dfsFile ${prevDfsFile.getOrElse("null")}") + logInfo(log"Deleted local file ${MDC(FILE_NAME, existingFile)} " + + log"with size ${MDC(NUM_BYTES, existingFileSize)} mapped" + + log" to previous dfsFile ${MDC(DFS_FILE, prevDfsFile.getOrElse("null"))}") } else { - logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile") + logInfo(log"reusing ${MDC(DFS_FILE, prevDfsFile)} present at " + + log"${MDC(EXISTING_FILE, existingFile)} " + + log"for ${MDC(FILE_NAME, requiredFile)}") } } @@ -600,13 +613,15 @@ class RocksDBFileManager( filesCopied += 1 bytesCopied += localFileSize localFilesToDfsFiles.put(localFileName, file) - logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes") + logInfo(log"Copied ${MDC(DFS_FILE, dfsFile)} to ${MDC(FILE_NAME, localFile)} - " + + log"${MDC(NUM_BYTES, localFileSize)} bytes") } else { filesReused += 1 } } - logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " + - s"$filesReused files reused.") + logInfo(log"Copied ${MDC(NUM_FILES_COPIED, filesCopied)} files " + + log"(${MDC(NUM_BYTES, bytesCopied)} bytes) from DFS to local with " + + log"${MDC(NUM_FILES_REUSED, filesReused)} files reused.") loadCheckpointMetrics = RocksDBFileManagerMetrics( bytesCopied = bytesCopied, @@ -622,7 +637,7 @@ class RocksDBFileManager( .filterNot(currentLocalFiles.contains) mappingsToClean.foreach { f => - logInfo(s"cleaning $f from the localFilesToDfsFiles map") + logInfo(log"cleaning ${MDC(FILE_NAME, f)} from the localFilesToDfsFiles map") localFilesToDfsFiles.remove(f) } } @@ -657,7 +672,8 @@ class RocksDBFileManager( totalBytes += bytes } zout.close() // so that any error in closing also cancels the output stream - logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr") + logInfo(log"Zipped ${MDC(NUM_BYTES, totalBytes)} bytes (before compression) to " + + log"${MDC(FILE_NAME, filesStr)}") // The other fields saveCheckpointMetrics should have been filled saveCheckpointMetrics = saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes)) @@ -675,11 +691,12 @@ class RocksDBFileManager( } /** Log the files present in a directory. This is useful for debugging. */ - private def logFilesInDir(dir: File, msg: String): Unit = { + private def logFilesInDir(dir: File, msg: MessageWithContext): Unit = { lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f => s"${f.getAbsolutePath} - ${f.length()} bytes" } - logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}") + logInfo(msg + log" - ${MDC(NUM_FILES, files.length)} files\n\t" + + log"${MDC(FILE_NAME, files.mkString("\n\t"))}") } private def newDFSFileName(localFileName: String): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala index 38b9dc56838e..66451234abc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.streaming.state import org.rocksdb._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ /** * Singleton responsible for managing cache and write buffer manager associated with all RocksDB @@ -47,8 +48,8 @@ object RocksDBMemoryManager extends Logging { } val totalMemoryUsageInBytes: Long = conf.totalMemoryUsageMB * 1024 * 1024 - logInfo(s"Creating RocksDB state store LRU cache with " + - s"total_size=$totalMemoryUsageInBytes") + logInfo(log"Creating RocksDB state store LRU cache with " + + log"total_size=${MDC(NUM_BYTES, totalMemoryUsageInBytes)}") // SPARK-44878 - avoid using strict limit to prevent insertion exception on cache full. // Please refer to RocksDB issue here - https://github.com/facebook/rocksdb/issues/8670 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index e05f9c24f719..154f4ea5fc3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -24,7 +24,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.StructType @@ -164,7 +165,8 @@ private[sql] class RocksDBStateStoreProvider verify(state == UPDATING, "Cannot commit after already committed or aborted") val newVersion = rocksDB.commit() state = COMMITTED - logInfo(s"Committed $newVersion for $id") + logInfo(log"Committed ${MDC(VERSION_NUMBER, newVersion)} " + + log"for ${MDC(STATE_STORE_ID, id)}") newVersion } catch { case e: Throwable => @@ -174,7 +176,8 @@ private[sql] class RocksDBStateStoreProvider override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") - logInfo(s"Aborting ${version + 1} for $id") + logInfo(log"Aborting ${MDC(VERSION_NUMBER, version + 1)} " + + log"for ${MDC(STATE_STORE_ID, id)}") rocksDB.rollback() state = ABORTED } @@ -238,7 +241,8 @@ private[sql] class RocksDBStateStoreProvider rocksDBMetrics.totalMemUsageBytes, stateStoreCustomMetrics) } else { - logInfo(s"Failed to collect metrics for store_id=$id and version=$version") + logInfo(log"Failed to collect metrics for store_id=${MDC(STATE_STORE_ID, id)} " + + log"and version=${MDC(VERSION_NUMBER, version)}") StateStoreMetrics(0, 0, Map.empty) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 699ce75a88de..9ada54e20543 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{LOAD_TIME, QUERY_RUN_ID, STATE_STORE_PROVIDER, STORE_ID} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors @@ -821,7 +821,7 @@ object StateStore extends Logging { provider.doMaintenance() if (!verifyIfStoreInstanceActive(id)) { unload(id) - logInfo(s"Unloaded $provider") + logInfo(log"Unloaded ${MDC(STATE_STORE_PROVIDER, provider)}") } } catch { case NonFatal(e) => @@ -830,8 +830,9 @@ object StateStore extends Logging { threadPoolException.set(e) } finally { val duration = System.currentTimeMillis() - startTime - val logMsg = s"Finished maintenance task for provider=$id" + - s" in elapsed_time=$duration\n" + val logMsg = + log"Finished maintenance task for provider=${MDC(STATE_STORE_PROVIDER, id)}" + + log" in elapsed_time=${MDC(TIME_UNITS, duration)}\n" if (duration > 5000) { logInfo(logMsg) } else { @@ -843,8 +844,9 @@ object StateStore extends Logging { } }) } else { - logInfo(s"Not processing partition ${id} for maintenance because it is currently " + - s"being processed") + logInfo(log"Not processing partition ${MDC(PARTITION_ID, id)} " + + log"for maintenance because it is currently " + + log"being processed") } } } @@ -858,8 +860,10 @@ object StateStore extends Logging { val providerIdsToUnload = coordinatorRef .map(_.reportActiveInstance(storeProviderId, host, executorId, otherProviderIds)) .getOrElse(Seq.empty[StateStoreProviderId]) - logInfo(s"Reported that the loaded instance $storeProviderId is active") - logDebug(s"The loaded instances are going to unload: ${providerIdsToUnload.mkString(", ")}") + logInfo(log"Reported that the loaded instance " + + log"${MDC(STATE_STORE_PROVIDER, storeProviderId)} is active") + logDebug(log"The loaded instances are going to unload: " + + log"${MDC(STATE_STORE_PROVIDER, providerIdsToUnload.mkString(", "))}") providerIdsToUnload } else { Seq.empty[StateStoreProviderId] @@ -890,7 +894,8 @@ object StateStore extends Logging { logDebug("Getting StateStoreCoordinatorRef") _coordRef = StateStoreCoordinatorRef.forExecutor(env) } - logInfo(s"Retrieved reference to StateStoreCoordinator: ${_coordRef}") + logInfo(log"Retrieved reference to StateStoreCoordinator: " + + log"${MDC(STATE_STORE_PROVIDER, _coordRef)}") Some(_coordRef) } else { _coordRef = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index 30cf49d8e56d..a52d2bb2c610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -25,7 +25,8 @@ import com.google.common.io.ByteStreams import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FSError, Path} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager @@ -108,8 +109,9 @@ abstract class StateStoreChangelogWriter( // IOException into FSError. case e: FSError if e.getCause.isInstanceOf[IOException] => case NonFatal(ex) => - logInfo(s"Failed to cancel changelog file $file for state store provider " + - s"with exception=$ex") + logInfo(log"Failed to cancel changelog file ${MDC(FILE_NAME, file)} " + + log"for state store provider " + + log"with exception=${MDC(ERROR, ex)}") } finally { backingFileStream = null compressedStream = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala index 5130933f52ef..cbb7c1e56907 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingSessionWindowStateManager.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming.state -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -261,7 +262,7 @@ class StreamingSessionWindowStateManagerImplV1( override def abortIfNeeded(store: StateStore): Unit = { if (!store.hasCommitted) { - logInfo(s"Aborted store ${store.id}") + logInfo(log"Aborted store ${MDC(STATE_STORE_ID, store.id)}") store.abort() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 1303668b8f86..b541f59a2165 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.TaskContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{END_INDEX, START_INDEX} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes @@ -463,7 +463,7 @@ class SymmetricHashJoinStateManager( def abortIfNeeded(): Unit = { if (!stateStore.hasCommitted) { - logInfo(s"Aborted store ${stateStore.id}") + logInfo(log"Aborted store ${MDC(STATE_STORE_ID, stateStore.id)}") stateStore.abort() } // If this class manages a state store provider by itself, it should take care of closing @@ -491,7 +491,7 @@ class SymmetricHashJoinStateManager( useMultipleValuesPerKey = false) stateStoreProvider.getStore(stateInfo.get.storeVersion) } - logInfo(s"Loaded store ${store.id}") + logInfo(log"Loaded store ${MDC(STATE_STORE_ID, store.id)}") store } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org