This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5643cfb71d34 [SPARK-48303][CORE] Reorganize `LogKeys`
5643cfb71d34 is described below

commit 5643cfb71d343133a185aa257f137074f41abfb3
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Thu May 16 23:20:23 2024 -0700

    [SPARK-48303][CORE] Reorganize `LogKeys`
    
    ### What changes were proposed in this pull request?
    The pr aims to `reorganize` `LogKeys`, includes:
    - remove some unused `LogLeys`
      ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE
      DEFAULT_COMPACTION_INTERVAL
      DRIVER_LIBRARY_PATH_KEY
      EXISTING_JARS
      EXPECTED_ANSWER
      FILTERS
      HAS_R_PACKAGE
      JAR_ENTRY
      LOG_KEY_FILE
      NUM_ADDED_MASTERS
      NUM_ADDED_WORKERS
      NUM_PARTITION_VALUES
      OUTPUT_LINE
      OUTPUT_LINE_NUMBER
      PARTITIONS_SIZE
      RULE_BATCH_NAME
      SERIALIZE_OUTPUT_LENGTH
      SHELL_COMMAND
      STREAM_SOURCE
    
    - merge `PARAMETER` into `PARAM` (because some are `full` spelled, and some 
are `abbreviations`, which are not unified)
      ESTIMATOR_PARAMETER_MAP -> ESTIMATOR_PARAM_MAP
      FUNCTION_PARAMETER -> FUNCTION_PARAM
      METHOD_PARAMETER_TYPES -> METHOD_PARAM_TYPES
    
    - merge `NUMBER` into `NUM` (abbreviations)
      MIN_VERSION_NUMBER -> MIN_VERSION_NUM
      RULE_NUMBER_OF_RUNS -> NUM_RULE_OF_RUNS
      VERSION_NUMBER -> VERSION_NUM
    
    - merge `TOTAL` into `NUM`
      TOTAL_RECORDS_READ -> NUM_RECORDS_READ
      TRAIN_WORD_COUNT -> NUM_TRAIN_WORD
    
    - `NUM` as prefix
      CHECKSUM_FILE_NUM -> NUM_CHECKSUM_FILE
      DATA_FILE_NUM -> NUM_DATA_FILE
      INDEX_FILE_NUM -> NUM_INDEX_FILE
    
    - COUNR -> NUM
      EXECUTOR_DESIRED_COUNT -> NUM_EXECUTOR_DESIRED
      EXECUTOR_LAUNCH_COUNT -> NUM_EXECUTOR_LAUNCH
      EXECUTOR_TARGET_COUNT -> NUM_EXECUTOR_TARGET
      KAFKA_PULLS_COUNT -> NUM_KAFKA_PULLS
      KAFKA_RECORDS_PULLED_COUNT -> NUM_KAFKA_RECORDS_PULLED
      MIN_FREQUENT_PATTERN_COUNT -> MIN_NUM_FREQUENT_PATTERN
      POD_COUNT -> NUM_POD
      POD_SHARED_SLOT_COUNT -> NUM_POD_SHARED_SLOT
      POD_TARGET_COUNT -> NUM_POD_TARGET
      RETRY_COUNT -> NUM_RETRY
    
    - fix some `typo`
      MALFORMATTED_STIRNG -> MALFORMATTED_STRING
    
    - other
      MAX_LOG_NUM_POLICY -> MAX_NUM_LOG_POLICY
      WEIGHTED_NUM -> NUM_WEIGHTED_EXAMPLES
    
    Changes in other code are additional changes caused by the above 
adjustments.
    
    ### Why are the changes needed?
    Let's make `LogKeys` easier to understand and more consistent.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46612 from panbingkun/reorganize_logkey.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../network/shuffle/RetryingBlockTransferor.java   |  6 +-
 .../scala/org/apache/spark/internal/LogKey.scala   | 68 ++++++++--------------
 .../sql/connect/client/GrpcRetryHandler.scala      |  8 +--
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala      |  4 +-
 .../sql/kafka010/KafkaOffsetReaderConsumer.scala   |  4 +-
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  |  6 +-
 .../streaming/kinesis/KinesisBackedBlockRDD.scala  |  4 +-
 .../org/apache/spark/api/r/RBackendHandler.scala   |  4 +-
 .../spark/deploy/history/FsHistoryProvider.scala   |  2 +-
 .../org/apache/spark/deploy/master/Master.scala    |  2 +-
 .../apache/spark/ml/tree/impl/RandomForest.scala   |  4 +-
 .../apache/spark/ml/tuning/CrossValidator.scala    |  4 +-
 .../spark/ml/tuning/TrainValidationSplit.scala     |  4 +-
 .../org/apache/spark/mllib/feature/Word2Vec.scala  |  4 +-
 .../org/apache/spark/mllib/fpm/PrefixSpan.scala    |  4 +-
 .../apache/spark/mllib/linalg/VectorsSuite.scala   |  4 +-
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  6 +-
 ...ernetesLocalDiskShuffleExecutorComponents.scala |  6 +-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  6 +-
 .../catalyst/expressions/V2ExpressionUtils.scala   |  4 +-
 .../spark/sql/catalyst/rules/RuleExecutor.scala    |  6 +-
 .../sql/execution/streaming/state/RocksDB.scala    | 18 +++---
 .../streaming/state/RocksDBFileManager.scala       | 22 +++----
 .../state/RocksDBStateStoreProvider.scala          |  6 +-
 .../apache/hive/service/server/HiveServer2.java    |  2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala     |  2 +-
 .../org/apache/spark/streaming/Checkpoint.scala    |  4 +-
 .../streaming/util/FileBasedWriteAheadLog.scala    |  4 +-
 28 files changed, 101 insertions(+), 117 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
index ca2073af87c1..83be2db5d0b7 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
@@ -184,7 +184,7 @@ public class RetryingBlockTransferor {
         logger.error("Exception while beginning {} of {} outstanding blocks 
(after {} retries)", e,
           MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
           MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length),
-          MDC.of(LogKeys.RETRY_COUNT$.MODULE$, numRetries));
+          MDC.of(LogKeys.NUM_RETRY$.MODULE$, numRetries));
       } else {
         logger.error("Exception while beginning {} of {} outstanding blocks", 
e,
           MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
@@ -217,7 +217,7 @@ public class RetryingBlockTransferor {
 
     logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms",
       MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
-      MDC.of(LogKeys.RETRY_COUNT$.MODULE$, retryCount),
+      MDC.of(LogKeys.NUM_RETRY$.MODULE$, retryCount),
       MDC.of(LogKeys.MAX_ATTEMPTS$.MODULE$, maxRetries),
       MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, outstandingBlocksIds.size()),
       MDC.of(LogKeys.RETRY_WAIT_TIME$.MODULE$, retryWaitTime));
@@ -313,7 +313,7 @@ public class RetryingBlockTransferor {
               logger.error("Failed to {} block {}, and will not retry ({} 
retries)", exception,
                 MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, 
listener.getTransferType()),
                 MDC.of(LogKeys.BLOCK_ID$.MODULE$, blockId),
-                MDC.of(LogKeys.RETRY_COUNT$.MODULE$,retryCount));
+                MDC.of(LogKeys.NUM_RETRY$.MODULE$,retryCount));
             } else {
               logger.debug(
                 String.format("Failed to %s block %s, and will not retry (%s 
retries)",
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index e03987933306..1f67a211c01f 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -31,7 +31,6 @@ trait LogKey {
  */
 object LogKeys {
   case object ACCUMULATOR_ID extends LogKey
-  case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
   case object ACTUAL_NUM_FILES extends LogKey
   case object ACTUAL_PARTITION_COLUMN extends LogKey
   case object ADDED_JARS extends LogKey
@@ -80,7 +79,6 @@ object LogKeys {
   case object CHECKPOINT_PATH extends LogKey
   case object CHECKPOINT_ROOT extends LogKey
   case object CHECKPOINT_TIME extends LogKey
-  case object CHECKSUM_FILE_NUM extends LogKey
   case object CHOSEN_WATERMARK extends LogKey
   case object CLASSIFIER extends LogKey
   case object CLASS_LOADER extends LogKey
@@ -143,10 +141,8 @@ object LogKeys {
   case object DATAFRAME_CACHE_ENTRY extends LogKey
   case object DATAFRAME_ID extends LogKey
   case object DATA_FILE extends LogKey
-  case object DATA_FILE_NUM extends LogKey
   case object DATA_SOURCE extends LogKey
   case object DATA_SOURCES extends LogKey
-  case object DEFAULT_COMPACTION_INTERVAL extends LogKey
   case object DEFAULT_COMPACT_INTERVAL extends LogKey
   case object DEFAULT_ISOLATION_LEVEL extends LogKey
   case object DEFAULT_NAME extends LogKey
@@ -164,7 +160,6 @@ object LogKeys {
   case object DIFF_DELTA extends LogKey
   case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
   case object DRIVER_ID extends LogKey
-  case object DRIVER_LIBRARY_PATH_KEY extends LogKey
   case object DRIVER_MEMORY_SIZE extends LogKey
   case object DRIVER_STATE extends LogKey
   case object DROPPED_PARTITIONS extends LogKey
@@ -179,7 +174,7 @@ object LogKeys {
   case object ENGINE extends LogKey
   case object EPOCH extends LogKey
   case object ERROR extends LogKey
-  case object ESTIMATOR_PARAMETER_MAP extends LogKey
+  case object ESTIMATOR_PARAM_MAP extends LogKey
   case object EVALUATED_FILTERS extends LogKey
   case object EVENT extends LogKey
   case object EVENT_LOG_DESTINATION extends LogKey
@@ -192,25 +187,20 @@ object LogKeys {
   case object EXECUTION_MEMORY_SIZE extends LogKey
   case object EXECUTION_PLAN_LEAVES extends LogKey
   case object EXECUTOR_BACKEND extends LogKey
-  case object EXECUTOR_DESIRED_COUNT extends LogKey
   case object EXECUTOR_ENVS extends LogKey
   case object EXECUTOR_ENV_REGEX extends LogKey
   case object EXECUTOR_ID extends LogKey
   case object EXECUTOR_IDS extends LogKey
   case object EXECUTOR_LAUNCH_COMMANDS extends LogKey
-  case object EXECUTOR_LAUNCH_COUNT extends LogKey
   case object EXECUTOR_MEMORY_SIZE extends LogKey
   case object EXECUTOR_RESOURCES extends LogKey
   case object EXECUTOR_SHUFFLE_INFO extends LogKey
   case object EXECUTOR_STATE extends LogKey
-  case object EXECUTOR_TARGET_COUNT extends LogKey
   case object EXECUTOR_TIMEOUT extends LogKey
   case object EXEC_AMOUNT extends LogKey
   case object EXISTING_FILE extends LogKey
-  case object EXISTING_JARS extends LogKey
   case object EXISTING_PATH extends LogKey
   case object EXIT_CODE extends LogKey
-  case object EXPECTED_ANSWER extends LogKey
   case object EXPECTED_NUM_FILES extends LogKey
   case object EXPECTED_PARTITION_COLUMN extends LogKey
   case object EXPIRY_TIMESTAMP extends LogKey
@@ -237,7 +227,6 @@ object LogKeys {
   case object FILE_SYSTEM extends LogKey
   case object FILE_VERSION extends LogKey
   case object FILTER extends LogKey
-  case object FILTERS extends LogKey
   case object FINAL_CONTEXT extends LogKey
   case object FINAL_OUTPUT_PATH extends LogKey
   case object FINAL_PATH extends LogKey
@@ -245,7 +234,7 @@ object LogKeys {
   case object FROM_OFFSET extends LogKey
   case object FROM_TIME extends LogKey
   case object FUNCTION_NAME extends LogKey
-  case object FUNCTION_PARAMETER extends LogKey
+  case object FUNCTION_PARAM extends LogKey
   case object GLOBAL_INIT_FILE extends LogKey
   case object GLOBAL_WATERMARK extends LogKey
   case object GROUP_BY_EXPRS extends LogKey
@@ -253,7 +242,6 @@ object LogKeys {
   case object HADOOP_VERSION extends LogKey
   case object HASH_JOIN_KEYS extends LogKey
   case object HASH_MAP_SIZE extends LogKey
-  case object HAS_R_PACKAGE extends LogKey
   case object HEARTBEAT extends LogKey
   case object HEARTBEAT_INTERVAL extends LogKey
   case object HISTORY_DIR extends LogKey
@@ -270,7 +258,6 @@ object LogKeys {
   case object INCOMPATIBLE_TYPES extends LogKey
   case object INDEX extends LogKey
   case object INDEX_FILE extends LogKey
-  case object INDEX_FILE_NUM extends LogKey
   case object INDEX_NAME extends LogKey
   case object INFERENCE_MODE extends LogKey
   case object INIT extends LogKey
@@ -282,7 +269,6 @@ object LogKeys {
   case object ISOLATION_LEVEL extends LogKey
   case object ISSUE_DATE extends LogKey
   case object IS_NETWORK_REQUEST_DONE extends LogKey
-  case object JAR_ENTRY extends LogKey
   case object JAR_MESSAGE extends LogKey
   case object JAR_URL extends LogKey
   case object JAVA_VERSION extends LogKey
@@ -292,8 +278,6 @@ object LogKeys {
   case object JOIN_CONDITION_SUB_EXPR extends LogKey
   case object JOIN_TYPE extends LogKey
   case object K8S_CONTEXT extends LogKey
-  case object KAFKA_PULLS_COUNT extends LogKey
-  case object KAFKA_RECORDS_PULLED_COUNT extends LogKey
   case object KEY extends LogKey
   case object KEY2 extends LogKey
   case object KEYTAB extends LogKey
@@ -320,11 +304,10 @@ object LogKeys {
   case object LOGICAL_PLAN_COLUMNS extends LogKey
   case object LOGICAL_PLAN_LEAVES extends LogKey
   case object LOG_ID extends LogKey
-  case object LOG_KEY_FILE extends LogKey
   case object LOG_OFFSET extends LogKey
   case object LOG_TYPE extends LogKey
   case object LOWER_BOUND extends LogKey
-  case object MALFORMATTED_STIRNG extends LogKey
+  case object MALFORMATTED_STRING extends LogKey
   case object MAP_ID extends LogKey
   case object MASTER_URL extends LogKey
   case object MAX_ATTEMPTS extends LogKey
@@ -335,11 +318,11 @@ object LogKeys {
   case object MAX_EXECUTOR_FAILURES extends LogKey
   case object MAX_FILE_VERSION extends LogKey
   case object MAX_JVM_METHOD_PARAMS_LENGTH extends LogKey
-  case object MAX_LOG_NUM_POLICY extends LogKey
   case object MAX_MEMORY_SIZE extends LogKey
   case object MAX_METHOD_CODE_SIZE extends LogKey
   case object MAX_NUM_BINS extends LogKey
   case object MAX_NUM_CHUNKS extends LogKey
+  case object MAX_NUM_LOG_POLICY extends LogKey
   case object MAX_NUM_PARTITIONS extends LogKey
   case object MAX_NUM_POSSIBLE_BINS extends LogKey
   case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
@@ -357,17 +340,17 @@ object LogKeys {
   case object METADATA_JSON extends LogKey
   case object META_FILE extends LogKey
   case object METHOD_NAME extends LogKey
-  case object METHOD_PARAMETER_TYPES extends LogKey
+  case object METHOD_PARAM_TYPES extends LogKey
   case object METRICS_JSON extends LogKey
   case object METRIC_NAME extends LogKey
   case object MINI_BATCH_FRACTION extends LogKey
   case object MIN_COMPACTION_BATCH_ID extends LogKey
-  case object MIN_FREQUENT_PATTERN_COUNT extends LogKey
+  case object MIN_NUM_FREQUENT_PATTERN extends LogKey
   case object MIN_POINT_PER_CLUSTER extends LogKey
   case object MIN_SHARE extends LogKey
   case object MIN_SIZE extends LogKey
   case object MIN_TIME extends LogKey
-  case object MIN_VERSION_NUMBER extends LogKey
+  case object MIN_VERSION_NUM extends LogKey
   case object MODEL_WEIGHTS extends LogKey
   case object MODULE_NAME extends LogKey
   case object NAMESPACE extends LogKey
@@ -383,9 +366,7 @@ object LogKeys {
   case object NODE_LOCATION extends LogKey
   case object NON_BUILT_IN_CONNECTORS extends LogKey
   case object NORM extends LogKey
-  case object NUM_ADDED_MASTERS extends LogKey
   case object NUM_ADDED_PARTITIONS extends LogKey
-  case object NUM_ADDED_WORKERS extends LogKey
   case object NUM_APPS extends LogKey
   case object NUM_BIN extends LogKey
   case object NUM_BLOCK_IDS extends LogKey
@@ -398,21 +379,27 @@ object LogKeys {
   case object NUM_BYTES_TO_WARN extends LogKey
   case object NUM_BYTES_USED extends LogKey
   case object NUM_CATEGORIES extends LogKey
+  case object NUM_CHECKSUM_FILE extends LogKey
   case object NUM_CHUNKS extends LogKey
   case object NUM_CLASSES extends LogKey
   case object NUM_COEFFICIENTS extends LogKey
   case object NUM_COLUMNS extends LogKey
   case object NUM_CONCURRENT_WRITER extends LogKey
   case object NUM_CORES extends LogKey
+  case object NUM_DATA_FILE extends LogKey
   case object NUM_DATA_FILES extends LogKey
   case object NUM_DRIVERS extends LogKey
   case object NUM_DROPPED_PARTITIONS extends LogKey
+  case object NUM_EFFECTIVE_RULE_OF_RUNS extends LogKey
   case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
   case object NUM_EVENTS extends LogKey
   case object NUM_EXAMPLES extends LogKey
   case object NUM_EXECUTOR_CORES extends LogKey
   case object NUM_EXECUTOR_CORES_REMAINING extends LogKey
   case object NUM_EXECUTOR_CORES_TOTAL extends LogKey
+  case object NUM_EXECUTOR_DESIRED extends LogKey
+  case object NUM_EXECUTOR_LAUNCH extends LogKey
+  case object NUM_EXECUTOR_TARGET extends LogKey
   case object NUM_FAILURES extends LogKey
   case object NUM_FEATURES extends LogKey
   case object NUM_FILES extends LogKey
@@ -420,8 +407,11 @@ object LogKeys {
   case object NUM_FILES_FAILED_TO_DELETE extends LogKey
   case object NUM_FILES_REUSED extends LogKey
   case object NUM_FREQUENT_ITEMS extends LogKey
+  case object NUM_INDEX_FILE extends LogKey
   case object NUM_INDEX_FILES extends LogKey
   case object NUM_ITERATIONS extends LogKey
+  case object NUM_KAFKA_PULLS extends LogKey
+  case object NUM_KAFKA_RECORDS_PULLED extends LogKey
   case object NUM_LEADING_SINGULAR_VALUES extends LogKey
   case object NUM_LEFT_PARTITION_VALUES extends LogKey
   case object NUM_LOADED_ENTRIES extends LogKey
@@ -432,23 +422,28 @@ object LogKeys {
   case object NUM_NODES extends LogKey
   case object NUM_PARTITIONS extends LogKey
   case object NUM_PARTITIONS2 extends LogKey
-  case object NUM_PARTITION_VALUES extends LogKey
   case object NUM_PATHS extends LogKey
   case object NUM_PEERS extends LogKey
   case object NUM_PEERS_REPLICATED_TO extends LogKey
   case object NUM_PEERS_TO_REPLICATE_TO extends LogKey
   case object NUM_PENDING_LAUNCH_TASKS extends LogKey
+  case object NUM_POD extends LogKey
+  case object NUM_POD_SHARED_SLOT extends LogKey
+  case object NUM_POD_TARGET extends LogKey
   case object NUM_POINT extends LogKey
   case object NUM_PREFIXES extends LogKey
   case object NUM_PRUNED extends LogKey
+  case object NUM_RECORDS_READ extends LogKey
   case object NUM_REMOVED_WORKERS extends LogKey
   case object NUM_REPLICAS extends LogKey
   case object NUM_REQUESTS extends LogKey
   case object NUM_REQUEST_SYNC_TASK extends LogKey
   case object NUM_RESOURCE_SLOTS extends LogKey
   case object NUM_RETRIES extends LogKey
+  case object NUM_RETRY extends LogKey
   case object NUM_RIGHT_PARTITION_VALUES extends LogKey
   case object NUM_ROWS extends LogKey
+  case object NUM_RULE_OF_RUNS extends LogKey
   case object NUM_SEQUENCES extends LogKey
   case object NUM_SLOTS extends LogKey
   case object NUM_SPILL_INFOS extends LogKey
@@ -456,7 +451,9 @@ object LogKeys {
   case object NUM_SUB_DIRS extends LogKey
   case object NUM_TASKS extends LogKey
   case object NUM_TASK_CPUS extends LogKey
+  case object NUM_TRAIN_WORD extends LogKey
   case object NUM_VERSIONS_RETAIN extends LogKey
+  case object NUM_WEIGHTED_EXAMPLES extends LogKey
   case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey
   case object OBJECT_ID extends LogKey
   case object OFFSET extends LogKey
@@ -477,14 +474,11 @@ object LogKeys {
   case object OS_NAME extends LogKey
   case object OS_VERSION extends LogKey
   case object OUTPUT extends LogKey
-  case object OUTPUT_LINE extends LogKey
-  case object OUTPUT_LINE_NUMBER extends LogKey
   case object OVERHEAD_MEMORY_SIZE extends LogKey
   case object PAGE_SIZE extends LogKey
   case object PARSE_MODE extends LogKey
   case object PARTITIONED_FILE_READER extends LogKey
   case object PARTITIONER extends LogKey
-  case object PARTITIONS_SIZE extends LogKey
   case object PARTITION_ID extends LogKey
   case object PARTITION_SPECIFICATION extends LogKey
   case object PARTITION_SPECS extends LogKey
@@ -494,14 +488,11 @@ object LogKeys {
   case object PERCENT extends LogKey
   case object PIPELINE_STAGE_UID extends LogKey
   case object PLUGIN_NAME extends LogKey
-  case object POD_COUNT extends LogKey
   case object POD_ID extends LogKey
   case object POD_NAME extends LogKey
   case object POD_NAMESPACE extends LogKey
   case object POD_PHASE extends LogKey
-  case object POD_SHARED_SLOT_COUNT extends LogKey
   case object POD_STATE extends LogKey
-  case object POD_TARGET_COUNT extends LogKey
   case object POINT_OF_CENTER extends LogKey
   case object POLICY extends LogKey
   case object POOL_NAME extends LogKey
@@ -569,7 +560,6 @@ object LogKeys {
   case object RESULT extends LogKey
   case object RESULT_SIZE_BYTES extends LogKey
   case object RESULT_SIZE_BYTES_MAX extends LogKey
-  case object RETRY_COUNT extends LogKey
   case object RETRY_INTERVAL extends LogKey
   case object RETRY_WAIT_TIME extends LogKey
   case object RIGHT_EXPR extends LogKey
@@ -580,15 +570,12 @@ object LogKeys {
   case object RPC_ADDRESS extends LogKey
   case object RPC_ENDPOINT_REF extends LogKey
   case object RPC_MESSAGE_CAPACITY extends LogKey
-  case object RULE_BATCH_NAME extends LogKey
   case object RULE_NAME extends LogKey
-  case object RULE_NUMBER_OF_RUNS extends LogKey
   case object RUN_ID extends LogKey
   case object SCALA_VERSION extends LogKey
   case object SCHEDULER_POOL_NAME extends LogKey
   case object SCHEMA extends LogKey
   case object SCHEMA2 extends LogKey
-  case object SERIALIZE_OUTPUT_LENGTH extends LogKey
   case object SERVER_NAME extends LogKey
   case object SERVICE_NAME extends LogKey
   case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
@@ -598,7 +585,6 @@ object LogKeys {
   case object SESSION_KEY extends LogKey
   case object SET_CLIENT_INFO_REQUEST extends LogKey
   case object SHARD_ID extends LogKey
-  case object SHELL_COMMAND extends LogKey
   case object SHELL_OPTIONS extends LogKey
   case object SHORT_USER_NAME extends LogKey
   case object SHUFFLE_BLOCK_INFO extends LogKey
@@ -656,7 +642,6 @@ object LogKeys {
   case object STREAM_CHUNK_ID extends LogKey
   case object STREAM_ID extends LogKey
   case object STREAM_NAME extends LogKey
-  case object STREAM_SOURCE extends LogKey
   case object SUBMISSION_ID extends LogKey
   case object SUBSAMPLING_RATE extends LogKey
   case object SUB_QUERY extends LogKey
@@ -701,14 +686,12 @@ object LogKeys {
   case object TOPIC_PARTITION_OFFSET_RANGE extends LogKey
   case object TOTAL extends LogKey
   case object TOTAL_EFFECTIVE_TIME extends LogKey
-  case object TOTAL_RECORDS_READ extends LogKey
   case object TOTAL_TIME extends LogKey
   case object TOTAL_TIME_READ extends LogKey
   case object TO_TIME extends LogKey
   case object TRAINING_SIZE extends LogKey
   case object TRAIN_VALIDATION_SPLIT_METRIC extends LogKey
   case object TRAIN_VALIDATION_SPLIT_METRICS extends LogKey
-  case object TRAIN_WORD_COUNT extends LogKey
   case object TRANSFER_TYPE extends LogKey
   case object TREE_NODE extends LogKey
   case object TRIGGER_INTERVAL extends LogKey
@@ -728,7 +711,7 @@ object LogKeys {
   case object USER_NAME extends LogKey
   case object UUID extends LogKey
   case object VALUE extends LogKey
-  case object VERSION_NUMBER extends LogKey
+  case object VERSION_NUM extends LogKey
   case object VIRTUAL_CORES extends LogKey
   case object VOCAB_SIZE extends LogKey
   case object WAIT_RESULT_TIME extends LogKey
@@ -736,7 +719,6 @@ object LogKeys {
   case object WATERMARK_CONSTRAINT extends LogKey
   case object WEB_URL extends LogKey
   case object WEIGHT extends LogKey
-  case object WEIGHTED_NUM extends LogKey
   case object WORKER extends LogKey
   case object WORKER_HOST extends LogKey
   case object WORKER_ID extends LogKey
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index f5bd504aba5c..7e0a356b9e49 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -22,7 +22,7 @@ import scala.util.control.NonFatal
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.LogKeys.{ERROR, POLICY, RETRY_COUNT, 
RETRY_WAIT_TIME}
+import org.apache.spark.internal.LogKeys.{ERROR, NUM_RETRY, POLICY, 
RETRY_WAIT_TIME}
 import org.apache.spark.internal.MDC
 
 private[sql] class GrpcRetryHandler(
@@ -190,7 +190,7 @@ private[sql] object GrpcRetryHandler extends Logging {
         // retry exception is considered immediately retriable without any 
policies.
         logWarning(
           log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
-            log"retrying (currentRetryNum=${MDC(RETRY_COUNT, 
currentRetryNum)})")
+            log"retrying (currentRetryNum=${MDC(NUM_RETRY, currentRetryNum)})")
         return
       }
 
@@ -201,7 +201,7 @@ private[sql] object GrpcRetryHandler extends Logging {
           logWarning(
             log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
               log"retrying (wait=${MDC(RETRY_WAIT_TIME, time.get.toMillis)} 
ms, " +
-              log"currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)}, " +
+              log"currentRetryNum=${MDC(NUM_RETRY, currentRetryNum)}, " +
               log"policy=${MDC(POLICY, policy.getName)}).")
           sleep(time.get.toMillis)
           return
@@ -210,7 +210,7 @@ private[sql] object GrpcRetryHandler extends Logging {
 
       logWarning(
         log"Non-Fatal error during RPC execution: ${MDC(ERROR, 
lastException)}, " +
-          log"exceeded retries (currentRetryNum=${MDC(RETRY_COUNT, 
currentRetryNum)})")
+          log"exceeded retries (currentRetryNum=${MDC(NUM_RETRY, 
currentRetryNum)})")
 
       val error = new RetriesExceeded()
       exceptionList.foreach(error.addSuppressed)
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index 39c842216cf0..9ac06a41a068 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{OFFSETS, RETRY_COUNT, 
TOPIC_PARTITION_OFFSET}
+import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, 
TOPIC_PARTITION_OFFSET}
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
@@ -536,7 +536,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
           case NonFatal(e) =>
             lastException = e
             logWarning(
-              log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka 
offsets: ", e)
+              log"Error in attempt ${MDC(NUM_RETRY, attempt)} getting Kafka 
offsets: ", e)
             attempt += 1
             Thread.sleep(offsetFetchAttemptIntervalMs)
             resetAdmin()
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index 17db894f504b..eceedbee1541 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{OFFSETS, RETRY_COUNT, 
TOPIC_PARTITION_OFFSET}
+import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, 
TOPIC_PARTITION_OFFSET}
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
@@ -613,7 +613,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
                 case NonFatal(e) =>
                   lastException = e
                   logWarning(
-                    log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting 
Kafka offsets: ", e)
+                    log"Error in attempt ${MDC(NUM_RETRY, attempt)} getting 
Kafka offsets: ", e)
                   attempt += 1
                   Thread.sleep(offsetFetchAttemptIntervalMs)
                   resetConsumer()
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 836c0fb13d4e..ceb9d96660ae 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -393,9 +393,9 @@ private[kafka010] class KafkaDataConsumer(
     val walTime = System.nanoTime() - startTimestampNano
 
     logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " +
-      log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " +
-      log"${MDC(KAFKA_PULLS_COUNT, numPolls)} polls " +
-      log"(polled out ${MDC(KAFKA_RECORDS_PULLED_COUNT, numRecordsPolled)} 
records), " +
+      log"${MDC(NUM_RECORDS_READ, totalRecordsRead)} records through " +
+      log"${MDC(NUM_KAFKA_PULLS, numPolls)} polls " +
+      log"(polled out ${MDC(NUM_KAFKA_RECORDS_PULLED, numRecordsPolled)} 
records), " +
       log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos / 
NANOS_PER_MILLIS.toDouble)} ms, " +
       log"during time span of ${MDC(TIME, walTime / 
NANOS_PER_MILLIS.toDouble)} ms."
     )
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 5a2f904f2c65..b391203b4b96 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -30,7 +30,7 @@ import com.amazonaws.services.kinesis.model._
 
 import org.apache.spark._
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{ERROR, RETRY_COUNT}
+import org.apache.spark.internal.LogKeys.{ERROR, NUM_RETRY}
 import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.NextIterator
@@ -279,7 +279,7 @@ class KinesisSequenceRangeIterator(
            t match {
              case ptee: ProvisionedThroughputExceededException =>
                logWarning(log"Error while ${MDC(ERROR, message)} " +
-                 log"[attempt = ${MDC(RETRY_COUNT, retryCount + 1)}]", ptee)
+                 log"[attempt = ${MDC(NUM_RETRY, retryCount + 1)}]", ptee)
              case e: Throwable =>
                throw new SparkException(s"Error while $message", e)
            }
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala 
b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 45ac90626468..c3d01ec47458 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -159,7 +159,7 @@ private[r] class RBackendHandler(server: RBackend)
             log"${MDC(CLASS_NAME, cls)}.${MDC(METHOD_NAME, methodName)}. 
Candidates are:")
           selectedMethods.foreach { method =>
             logWarning(log"${MDC(METHOD_NAME, methodName)}(" +
-              log"${MDC(METHOD_PARAMETER_TYPES, 
method.getParameterTypes.mkString(","))})")
+              log"${MDC(METHOD_PARAM_TYPES, 
method.getParameterTypes.mkString(","))})")
           }
           throw new Exception(s"No matched method found for $cls.$methodName")
         }
@@ -181,7 +181,7 @@ private[r] class RBackendHandler(server: RBackend)
             + log"Candidates are:")
           ctors.foreach { ctor =>
             logWarning(log"${MDC(CLASS_NAME, cls)}(" +
-              log"${MDC(METHOD_PARAMETER_TYPES, 
ctor.getParameterTypes.mkString(","))})")
+              log"${MDC(METHOD_PARAM_TYPES, 
ctor.getParameterTypes.mkString(","))})")
           }
           throw new Exception(s"No matched constructor found for $cls")
         }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 53b239ddfd79..bfd96167b169 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1015,7 +1015,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       }
       if (count > 0) {
         logWarning(log"Fail to clean up according to MAX_LOG_NUM policy " +
-          log"(${MDC(MAX_LOG_NUM_POLICY, maxNum)}).")
+          log"(${MDC(MAX_NUM_LOG_POLICY, maxNum)}).")
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d9f86d0d0262..84e67cba33a9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -575,7 +575,7 @@ private[deploy] class Master(
               if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                 logError(log"Application ${MDC(LogKeys.APP_DESC, 
appInfo.desc.name)} " +
                   log"with ID ${MDC(LogKeys.APP_ID, appInfo.id)} " +
-                  log"failed ${MDC(LogKeys.RETRY_COUNT, appInfo.retryCount)} 
times; removing it")
+                  log"failed ${MDC(LogKeys.NUM_RETRY, appInfo.retryCount)} 
times; removing it")
                 removeApplication(appInfo, ApplicationState.FAILED)
               }
             }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index 3461c5218f39..452532df5a2b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -22,7 +22,7 @@ import scala.util.Random
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{MAX_MEMORY_SIZE, MEMORY_SIZE, 
NUM_CLASSES, NUM_EXAMPLES, NUM_FEATURES, NUM_NODES, TIMER, WEIGHTED_NUM}
+import org.apache.spark.internal.LogKeys.{MAX_MEMORY_SIZE, MEMORY_SIZE, 
NUM_CLASSES, NUM_EXAMPLES, NUM_FEATURES, NUM_NODES, NUM_WEIGHTED_EXAMPLES, 
TIMER}
 import org.apache.spark.ml.classification.DecisionTreeClassificationModel
 import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.impl.Utils
@@ -136,7 +136,7 @@ private[spark] object RandomForest extends Logging with 
Serializable {
         logInfo(log"numClasses: ${MDC(NUM_CLASSES, metadata.numClasses)}")
         logInfo(log"numExamples: ${MDC(NUM_EXAMPLES, metadata.numExamples)}")
         logInfo(log"weightedNumExamples: " +
-          log"${MDC(WEIGHTED_NUM, metadata.weightedNumExamples)}")
+          log"${MDC(NUM_WEIGHTED_EXAMPLES, metadata.weightedNumExamples)}")
     }
 
     timer.start("init")
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
index 55b78d4bf3bb..867f35a5d2b8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
@@ -28,7 +28,7 @@ import org.json4s.DefaultFormats
 
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CROSS_VALIDATION_METRIC, 
CROSS_VALIDATION_METRICS, ESTIMATOR_PARAMETER_MAP}
+import org.apache.spark.internal.LogKeys.{CROSS_VALIDATION_METRIC, 
CROSS_VALIDATION_METRICS, ESTIMATOR_PARAM_MAP}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.evaluation.Evaluator
 import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators}
@@ -198,7 +198,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") 
override val uid: String)
     val (bestMetric, bestIndex) =
       if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1)
       else metrics.zipWithIndex.minBy(_._1)
-    instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAMETER_MAP, 
epm(bestIndex))}")
+    instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAM_MAP, 
epm(bestIndex))}")
     instr.logInfo(log"Best cross-validation metric: 
${MDC(CROSS_VALIDATION_METRIC, bestMetric)}.")
     val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
     copyValues(new CrossValidatorModel(uid, bestModel, metrics)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
index fa4bcc32de95..8e33ae6aad28 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
@@ -29,7 +29,7 @@ import org.json4s.DefaultFormats
 
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{ESTIMATOR_PARAMETER_MAP, 
TRAIN_VALIDATION_SPLIT_METRIC, TRAIN_VALIDATION_SPLIT_METRICS}
+import org.apache.spark.internal.LogKeys.{ESTIMATOR_PARAM_MAP, 
TRAIN_VALIDATION_SPLIT_METRIC, TRAIN_VALIDATION_SPLIT_METRICS}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.evaluation.Evaluator
 import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
@@ -174,7 +174,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") 
override val uid: St
     val (bestMetric, bestIndex) =
       if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1)
       else metrics.zipWithIndex.minBy(_._1)
-    instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAMETER_MAP, 
epm(bestIndex))}")
+    instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAM_MAP, 
epm(bestIndex))}")
     instr.logInfo(log"Best train validation split metric: " +
       log"${MDC(TRAIN_VALIDATION_SPLIT_METRIC, bestMetric)}.")
     val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index d2fc78a7ff7f..499dc09b8621 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{ALPHA, COUNT, TRAIN_WORD_COUNT, 
VOCAB_SIZE}
+import org.apache.spark.internal.LogKeys.{ALPHA, COUNT, NUM_TRAIN_WORD, 
VOCAB_SIZE}
 import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE
 import org.apache.spark.ml.linalg.BLAS
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
@@ -210,7 +210,7 @@ class Word2Vec extends Serializable with Logging {
       a += 1
     }
     logInfo(log"vocabSize = ${MDC(VOCAB_SIZE, vocabSize)}," +
-      log" trainWordsCount = ${MDC(TRAIN_WORD_COUNT, trainWordsCount)}")
+      log" trainWordsCount = ${MDC(NUM_TRAIN_WORD, trainWordsCount)}")
   }
 
   private def createExpTable(): Array[Float] = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 539e93867bc2..3c648f34c610 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{MIN_FREQUENT_PATTERN_COUNT, 
NUM_FREQUENT_ITEMS, NUM_LOCAL_FREQUENT_PATTERN, NUM_PREFIXES, NUM_SEQUENCES}
+import org.apache.spark.internal.LogKeys.{MIN_NUM_FREQUENT_PATTERN, 
NUM_FREQUENT_ITEMS, NUM_LOCAL_FREQUENT_PATTERN, NUM_PREFIXES, NUM_SEQUENCES}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -142,7 +142,7 @@ class PrefixSpan private (
     val totalCount = data.count()
     logInfo(log"number of sequences: ${MDC(NUM_SEQUENCES, totalCount)}")
     val minCount = math.ceil(minSupport * totalCount).toLong
-    logInfo(log"minimum count for a frequent pattern: 
${MDC(MIN_FREQUENT_PATTERN_COUNT, minCount)}")
+    logInfo(log"minimum count for a frequent pattern: 
${MDC(MIN_NUM_FREQUENT_PATTERN, minCount)}")
 
     // Find frequent items.
     val freqItems = findFrequentItems(data, minCount)
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 6301638c9a46..135d7e26c6d8 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -25,7 +25,7 @@ import breeze.linalg.{DenseMatrix => BDM}
 import org.json4s.jackson.JsonMethods.{parse => parseJson}
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-import org.apache.spark.internal.LogKeys.MALFORMATTED_STIRNG
+import org.apache.spark.internal.LogKeys.MALFORMATTED_STRING
 import org.apache.spark.internal.MDC
 import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.ml.{linalg => newlinalg}
@@ -228,7 +228,7 @@ class VectorsSuite extends SparkFunSuite {
     malformatted.foreach { s =>
       intercept[SparkException] {
         Vectors.parse(s)
-        logInfo(log"Didn't detect malformatted string 
${MDC(MALFORMATTED_STIRNG, s)}.")
+        logInfo(log"Didn't detect malformatted string 
${MDC(MALFORMATTED_STRING, s)}.")
       }
     }
   }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 38303b314082..ef3547fd389f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -400,9 +400,9 @@ class ExecutorPodsAllocator(
           math.min(math.min(numMissingPodsForRpId, podAllocationSize), 
sharedSlotFromPendingPods)
         logInfo(log"Going to request ${MDC(LogKeys.COUNT, 
numExecutorsToAllocate)} executors from" +
           log" Kubernetes for ResourceProfile Id: 
${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " +
-          log"target: ${MDC(LogKeys.POD_TARGET_COUNT, targetNum)}, " +
-          log"known: ${MDC(LogKeys.POD_COUNT, podCountForRpId)}, 
sharedSlotFromPendingPods: " +
-          log"${MDC(LogKeys.POD_SHARED_SLOT_COUNT, 
sharedSlotFromPendingPods)}.")
+          log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " +
+          log"known: ${MDC(LogKeys.NUM_POD, podCountForRpId)}, 
sharedSlotFromPendingPods: " +
+          log"${MDC(LogKeys.NUM_POD_SHARED_SLOT, sharedSlotFromPendingPods)}.")
         requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, 
k8sKnownPVCNames)
       }
     }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index 7a6fdef7411b..2728385874f6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -95,9 +95,9 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
       .partition(_.getName.contains(".checksum"))
     val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
 
-    logInfo(log"Found ${MDC(LogKeys.DATA_FILE_NUM, dataFiles.length)} data 
files, " +
-      log"${MDC(LogKeys.INDEX_FILE_NUM, indexFiles.length)} index files, " +
-      log"and ${MDC(LogKeys.CHECKSUM_FILE_NUM, checksumFiles.length)} checksum 
files.")
+    logInfo(log"Found ${MDC(LogKeys.NUM_DATA_FILE, dataFiles.length)} data 
files, " +
+      log"${MDC(LogKeys.NUM_INDEX_FILE, indexFiles.length)} index files, " +
+      log"and ${MDC(LogKeys.NUM_CHECKSUM_FILE, checksumFiles.length)} checksum 
files.")
 
     // Build a hashmap with checksum file name as a key
     val checksumFileMap = new mutable.HashMap[String, File]()
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 29cf8ccc20f9..c86195d0ef31 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -599,7 +599,7 @@ private[yarn] class YarnAllocator(
         val numToCancel = math.min(numPendingAllocate, -missing)
         logInfo(log"Canceling requests for ${MDC(LogKeys.COUNT, numToCancel)} 
executor " +
           log"container(s) to have a new desired total " +
-          log"${MDC(LogKeys.EXECUTOR_DESIRED_COUNT,
+          log"${MDC(LogKeys.NUM_EXECUTOR_DESIRED,
             getOrUpdateTargetNumExecutorsForRPId(rpId))} executors.")
         // cancel pending allocate requests by taking locality preference into 
account
         val cancelRequests = (staleRequests ++ anyHostRequests ++ 
localRequests).take(numToCancel)
@@ -707,7 +707,7 @@ private[yarn] class YarnAllocator(
     runAllocatedContainers(containersToUse)
 
     logInfo(log"Received ${MDC(LogKeys.COUNT, allocatedContainers.size)} 
containers from YARN, " +
-      log"launching executors on ${MDC(LogKeys.EXECUTOR_LAUNCH_COUNT, 
containersToUse.size)} " +
+      log"launching executors on ${MDC(LogKeys.NUM_EXECUTOR_LAUNCH, 
containersToUse.size)} " +
       log"of them.")
   }
 
@@ -819,7 +819,7 @@ private[yarn] class YarnAllocator(
       } else {
         logInfo(log"Skip launching executorRunnable as running executors 
count: " +
           log"${MDC(LogKeys.COUNT, rpRunningExecs)} reached target executors 
count: " +
-          log"${MDC(LogKeys.EXECUTOR_TARGET_COUNT, 
getOrUpdateTargetNumExecutorsForRPId(rpId))}.")
+          log"${MDC(LogKeys.NUM_EXECUTOR_TARGET, 
getOrUpdateTargetNumExecutorsForRPId(rpId))}.")
       }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
index ea43bebe8ecb..220920a5a319 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import java.lang.reflect.{Method, Modifier}
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{FUNCTION_NAME, FUNCTION_PARAMETER}
+import org.apache.spark.internal.LogKeys.{FUNCTION_NAME, FUNCTION_PARAM}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
@@ -136,7 +136,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging 
{
       case _: NoSuchFunctionException =>
         val parameterString = args.map(_.dataType.typeName).mkString("(", ", 
", ")")
         logWarning(log"V2 function ${MDC(FUNCTION_NAME, name)} " +
-          log"with parameter types ${MDC(FUNCTION_PARAMETER, parameterString)} 
is used in " +
+          log"with parameter types ${MDC(FUNCTION_PARAM, parameterString)} is 
used in " +
           log"partition transforms, but its definition couldn't be found in 
the function catalog " +
           log"provided")
         None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index c3c105995cd8..0aa01e4f5c51 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -90,14 +90,16 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends 
Logging {
   def logMetrics(metrics: QueryExecutionMetrics): Unit = {
     val totalTime = metrics.time / NANOS_PER_MILLIS.toDouble
     val totalTimeEffective = metrics.timeEffective / NANOS_PER_MILLIS.toDouble
+    // scalastyle:off line.size.limit
     val message: MessageWithContext =
       log"""
          |=== Metrics of Executed Rules ===
-         |Total number of runs: ${MDC(RULE_NUMBER_OF_RUNS, metrics.numRuns)}
+         |Total number of runs: ${MDC(NUM_RULE_OF_RUNS, metrics.numRuns)}
          |Total time: ${MDC(TOTAL_TIME, totalTime)} ms
-         |Total number of effective runs: ${MDC(RULE_NUMBER_OF_RUNS, 
metrics.numEffectiveRuns)}
+         |Total number of effective runs: ${MDC(NUM_EFFECTIVE_RULE_OF_RUNS, 
metrics.numEffectiveRuns)}
          |Total time of effective runs: ${MDC(TOTAL_EFFECTIVE_TIME, 
totalTimeEffective)} ms
       """.stripMargin
+    // scalastyle:on line.size.limit
 
     logBasedOnLevel(message)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 151695192281..088242b4246e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -177,7 +177,7 @@ class RocksDB(
     assert(version >= 0)
     acquire(LoadStore)
     recordedMetrics = None
-    logInfo(log"Loading ${MDC(LogKeys.VERSION_NUMBER, version)}")
+    logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
     try {
       if (loadedVersion != version) {
         closeDB()
@@ -212,7 +212,7 @@ class RocksDB(
       if (conf.resetStatsOnLoad) {
         nativeStats.reset
       }
-      logInfo(log"Loaded ${MDC(LogKeys.VERSION_NUMBER, version)}")
+      logInfo(log"Loaded ${MDC(LogKeys.VERSION_NUM, version)}")
     } catch {
       case t: Throwable =>
         loadedVersion = -1  // invalidate loaded data
@@ -548,7 +548,7 @@ class RocksDB(
     val newVersion = loadedVersion + 1
     try {
 
-      logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUMBER, 
newVersion)}")
+      logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, 
newVersion)}")
 
       var compactTimeMs = 0L
       var flushTimeMs = 0L
@@ -556,7 +556,7 @@ class RocksDB(
       if (shouldCreateSnapshot()) {
         // Need to flush the change to disk before creating a checkpoint
         // because rocksdb wal is disabled.
-        logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUMBER, 
newVersion)}")
+        logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, 
newVersion)}")
         flushTimeMs = timeTakenMs {
           // Flush updates to all available column families
           assert(!colFamilyNameToHandleMap.isEmpty)
@@ -574,7 +574,7 @@ class RocksDB(
 
         checkpointTimeMs = timeTakenMs {
           val checkpointDir = createTempDir("checkpoint")
-          logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUMBER, 
newVersion)} " +
+          logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUM, 
newVersion)} " +
             log"in ${MDC(LogKeys.PATH, checkpointDir)}")
           // Make sure the directory does not exist. Native RocksDB fails if 
the directory to
           // checkpoint exists.
@@ -595,7 +595,7 @@ class RocksDB(
         }
       }
 
-      logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUMBER, 
newVersion)} to DFS")
+      logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUM, 
newVersion)} to DFS")
       val fileSyncTimeMs = timeTakenMs {
         if (enableChangelogCheckpointing) {
           try {
@@ -619,7 +619,7 @@ class RocksDB(
         "fileSync" -> fileSyncTimeMs
       )
       recordedMetrics = Some(metrics)
-      logInfo(log"Committed ${MDC(LogKeys.VERSION_NUMBER, newVersion)}, " +
+      logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " +
         log"stats = ${MDC(LogKeys.METRICS_JSON, recordedMetrics.get.json)}")
       loadedVersion
     } catch {
@@ -656,7 +656,7 @@ class RocksDB(
             fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
           }
           logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of 
version " +
-            log"${MDC(LogKeys.VERSION_NUMBER, version)}," +
+            log"${MDC(LogKeys.VERSION_NUM, version)}," +
             log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
         } finally {
           localCheckpoint.foreach(_.close())
@@ -676,7 +676,7 @@ class RocksDB(
     // Make sure changelogWriter gets recreated next time.
     changelogWriter = None
     release(RollbackStore)
-    logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUMBER, loadedVersion)}")
+    logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUM, loadedVersion)}")
   }
 
   def doMaintenance(): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index d8b099a56312..0a460ece2400 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -206,13 +206,13 @@ class RocksDBFileManager(
   /** Save all the files in given local checkpoint directory as a committed 
version in DFS */
   def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): 
Unit = {
     logFilesInDir(checkpointDir, log"Saving checkpoint files " +
-      log"for version ${MDC(LogKeys.VERSION_NUMBER, version)}")
+      log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
     val (localImmutableFiles, localOtherFiles) = 
listRocksDBFiles(checkpointDir)
     val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
     val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
     val metadataFile = localMetadataFile(checkpointDir)
     metadata.writeToFile(metadataFile)
-    logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUMBER, 
version)}:\n" +
+    logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
       log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
 
     if (version <= 1 && numKeys <= 0) {
@@ -229,7 +229,7 @@ class RocksDBFileManager(
       }
     }
     zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
-    logInfo(log"Saved checkpoint file for version 
${MDC(LogKeys.VERSION_NUMBER, version)}")
+    logInfo(log"Saved checkpoint file for version ${MDC(LogKeys.VERSION_NUM, 
version)}")
   }
 
   /**
@@ -239,7 +239,7 @@ class RocksDBFileManager(
    * local directory.
    */
   def loadCheckpointFromDfs(version: Long, localDir: File): 
RocksDBCheckpointMetadata = {
-    logInfo(log"Loading checkpoint files for version 
${MDC(LogKeys.VERSION_NUMBER, version)}")
+    logInfo(log"Loading checkpoint files for version 
${MDC(LogKeys.VERSION_NUM, version)}")
     // The unique ids of SST files are checked when opening a rocksdb 
instance. The SST files
     // in larger versions can't be reused even if they have the same size and 
name because
     // they belong to another rocksdb instance.
@@ -257,7 +257,7 @@ class RocksDBFileManager(
       // Copy the necessary immutable files
       val metadataFile = localMetadataFile(localDir)
       val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
-      logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUMBER, 
version)}:\n" +
+      logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
         log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
       loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
       versionToRocksDBFiles.put(version, metadata.immutableFiles)
@@ -265,7 +265,7 @@ class RocksDBFileManager(
       metadata
     }
     logFilesInDir(localDir, log"Loaded checkpoint files " +
-      log"for version ${MDC(LogKeys.VERSION_NUMBER, version)}")
+      log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
     metadata
   }
 
@@ -345,7 +345,7 @@ class RocksDBFileManager(
     versionsToDelete.foreach { version =>
       try {
         fm.delete(dfsChangelogFile(version))
-        logInfo(log"Deleted changelog file ${MDC(LogKeys.VERSION_NUMBER, 
version)}")
+        logInfo(log"Deleted changelog file ${MDC(LogKeys.VERSION_NUM, 
version)}")
       } catch {
         case e: Exception =>
           logWarning(
@@ -438,7 +438,7 @@ class RocksDBFileManager(
     filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, 
allSstFiles ++ allLogFiles)
       .map(_ -> -1L)
     logInfo(log"Deleting ${MDC(LogKeys.NUM_FILES, filesToDelete.size)} " +
-      log"files not used in versions >= ${MDC(LogKeys.VERSION_NUMBER, 
minVersionToRetain)}")
+      log"files not used in versions >= ${MDC(LogKeys.VERSION_NUM, 
minVersionToRetain)}")
     var failedToDelete = 0
     filesToDelete.foreach { case (dfsFileName, maxUsedVersion) =>
       try {
@@ -477,7 +477,7 @@ class RocksDBFileManager(
     logInfo(log"Deleted ${MDC(LogKeys.NUM_FILES, filesToDelete.size - 
failedToDelete)} files " +
       log"(failed to delete" +
       log"${MDC(LogKeys.NUM_FILES_FAILED_TO_DELETE, failedToDelete)} files) " +
-      log"not used in versions >= ${MDC(LogKeys.MIN_VERSION_NUMBER, 
minVersionToRetain)}")
+      log"not used in versions >= ${MDC(LogKeys.MIN_VERSION_NUM, 
minVersionToRetain)}")
     val changelogVersionsToDelete = changelogFiles
       .map(_.getName.stripSuffix(".changelog")).map(_.toLong)
       .filter(_ < minVersionToRetain)
@@ -490,7 +490,7 @@ class RocksDBFileManager(
       localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
     // Get the immutable files used in previous versions, as some of those 
uploaded files can be
     // reused for this version
-    logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUMBER, 
version)}")
+    logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, 
version)}")
 
     var bytesCopied = 0L
     var filesCopied = 0L
@@ -528,7 +528,7 @@ class RocksDBFileManager(
     }
     logInfo(log"Copied ${MDC(LogKeys.NUM_FILES_COPIED, filesCopied)} files " +
       log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" +
-      log" DFS for version ${MDC(LogKeys.VERSION_NUMBER, version)}. " +
+      log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " +
       log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without 
copying.")
     versionToRocksDBFiles.put(version, immutableFiles)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 9f568d075cd9..e7fc9f56dd9e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -165,7 +165,7 @@ private[sql] class RocksDBStateStoreProvider
         verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
         val newVersion = rocksDB.commit()
         state = COMMITTED
-        logInfo(log"Committed ${MDC(VERSION_NUMBER, newVersion)} " +
+        logInfo(log"Committed ${MDC(VERSION_NUM, newVersion)} " +
           log"for ${MDC(STATE_STORE_ID, id)}")
         newVersion
       } catch {
@@ -176,7 +176,7 @@ private[sql] class RocksDBStateStoreProvider
 
     override def abort(): Unit = {
       verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
-      logInfo(log"Aborting ${MDC(VERSION_NUMBER, version + 1)} " +
+      logInfo(log"Aborting ${MDC(VERSION_NUM, version + 1)} " +
         log"for ${MDC(STATE_STORE_ID, id)}")
       rocksDB.rollback()
       state = ABORTED
@@ -242,7 +242,7 @@ private[sql] class RocksDBStateStoreProvider
           stateStoreCustomMetrics)
       } else {
         logInfo(log"Failed to collect metrics for 
store_id=${MDC(STATE_STORE_ID, id)} " +
-          log"and version=${MDC(VERSION_NUMBER, version)}")
+          log"and version=${MDC(VERSION_NUM, version)}")
         StateStoreMetrics(0, 0, Map.empty)
       }
     }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
index 9345125a8279..46ee775e8dd4 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java
@@ -145,7 +145,7 @@ public class HiveServer2 extends CompositeService {
           throw new Error("Max start attempts " + maxAttempts + " exhausted", 
throwable);
         } else {
           LOG.warn("Error starting HiveServer2 on attempt {}, will retry in 60 
seconds",
-            throwable, MDC.of(LogKeys.RETRY_COUNT$.MODULE$, attempts));
+            throwable, MDC.of(LogKeys.NUM_RETRY$.MODULE$, attempts));
           try {
             Thread.sleep(60L * 1000L);
           } catch (InterruptedException e) {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 1cd60c0d3fff..2bb2fe970a11 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -232,7 +232,7 @@ private[hive] class HiveClientImpl(
           caughtException = e
           logWarning(
             log"HiveClient got thrift exception, destroying client and 
retrying " +
-              log"${MDC(RETRY_COUNT, numTries)} times", e)
+              log"${MDC(NUM_RETRY, numTries)} times", e)
           clientLoader.cachedHive = null
           Thread.sleep(retryDelayMillis)
       }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index a5d22f0bb5bd..bed048c4b5df 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE, 
CHECKPOINT_TIME, PATH, RETRY_COUNT, TEMP_FILE}
+import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE, 
CHECKPOINT_TIME, NUM_RETRY, PATH, TEMP_FILE}
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.streaming.scheduler.JobGenerator
@@ -288,7 +288,7 @@ class CheckpointWriter(
           return
         } catch {
           case ioe: IOException =>
-            val msg = log"Error in attempt ${MDC(RETRY_COUNT, attempts)} of 
writing checkpoint " +
+            val msg = log"Error in attempt ${MDC(NUM_RETRY, attempts)} of 
writing checkpoint " +
               log"to '${MDC(CHECKPOINT_FILE, checkpointFile)}'"
             logWarning(msg, ioe)
             fs = null
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 6162f4819016..58a6b929a81f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{RETRY_COUNT, WRITE_AHEAD_LOG_INFO}
+import org.apache.spark.internal.LogKeys.{NUM_RETRY, WRITE_AHEAD_LOG_INFO}
 import org.apache.spark.util.{CompletionIterator, ThreadUtils}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -107,7 +107,7 @@ private[streaming] class FileBasedWriteAheadLog(
       }
     }
     if (fileSegment == null) {
-      logError(log"Failed to write to write ahead log after ${MDC(RETRY_COUNT, 
failures)} failures")
+      logError(log"Failed to write to write ahead log after ${MDC(NUM_RETRY, 
failures)} failures")
       throw lastException
     }
     fileSegment


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to